using System; using System.Collections.Generic; using System.Linq; using Newtonsoft.Json; using Umbraco.Core; using Umbraco.Core.Cache; using Umbraco.Core.Sync; using Umbraco.Web.Routing; using Umbraco.Core.Logging; using Umbraco.Core.Persistence; using Umbraco.Core.Persistence.Dtos; using Umbraco.Core.Scoping; using Umbraco.Core.Hosting; namespace Umbraco.Web { /// /// An implementation that works by storing messages in the database. /// /// /// This binds to appropriate umbraco events in order to trigger the Boot(), Sync() & FlushBatch() calls /// public class BatchedDatabaseServerMessenger : DatabaseServerMessenger, IBatchedDatabaseServerMessenger { private readonly IUmbracoDatabaseFactory _databaseFactory; private readonly IRequestCache _requestCache; private readonly IRequestAccessor _requestAccessor; public BatchedDatabaseServerMessenger( IMainDom mainDom, IUmbracoDatabaseFactory databaseFactory, IScopeProvider scopeProvider, ISqlContext sqlContext, IProfilingLogger proflog, IServerRegistrar serverRegistrar, DatabaseServerMessengerOptions options, IHostingEnvironment hostingEnvironment, CacheRefresherCollection cacheRefreshers, IRequestCache requestCache, IRequestAccessor requestAccessor) : base(mainDom, scopeProvider, sqlContext, proflog, serverRegistrar, true, options, hostingEnvironment, cacheRefreshers) { _databaseFactory = databaseFactory; _requestCache = requestCache; _requestAccessor = requestAccessor; } // invoked by DatabaseServerRegistrarAndMessengerComponent public void Startup() { _requestAccessor.EndRequest += UmbracoModule_EndRequest; if (_databaseFactory.CanConnect == false) { Logger.LogWarning("Cannot connect to the database, distributed calls will not be enabled for this server."); } else { Boot(); } } private void UmbracoModule_EndRequest(object sender, UmbracoRequestEventArgs e) { // will clear the batch - will remain in HttpContext though - that's ok FlushBatch(); } protected override void DeliverRemote(ICacheRefresher refresher, MessageType messageType, IEnumerable ids = null, string json = null) { var idsA = ids?.ToArray(); Type arrayType; if (GetArrayType(idsA, out arrayType) == false) throw new ArgumentException("All items must be of the same type, either int or Guid.", nameof(ids)); BatchMessage(refresher, messageType, idsA, arrayType, json); } public void FlushBatch() { var batch = GetBatch(false); if (batch == null) return; var instructions = batch.SelectMany(x => x.Instructions).ToArray(); batch.Clear(); //Write the instructions but only create JSON blobs with a max instruction count equal to MaxProcessingInstructionCount using (var scope = ScopeProvider.CreateScope()) { foreach (var instructionsBatch in instructions.InGroupsOf(Options.MaxProcessingInstructionCount)) { WriteInstructions(scope, instructionsBatch); } scope.Complete(); } } private void WriteInstructions(IScope scope, IEnumerable instructions) { var dto = new CacheInstructionDto { UtcStamp = DateTime.UtcNow, Instructions = JsonConvert.SerializeObject(instructions, Formatting.None), OriginIdentity = LocalIdentity, InstructionCount = instructions.Sum(x => x.JsonIdCount) }; scope.Database.Insert(dto); } protected ICollection GetBatch(bool create) { var key = nameof(BatchedDatabaseServerMessenger); if (!_requestCache.IsAvailable) return null; // no thread-safety here because it'll run in only 1 thread (request) at a time var batch = (ICollection)_requestCache.Get(key); if (batch == null && create) { batch = new List(); _requestCache.Set(key, batch); } return batch; } protected void BatchMessage( ICacheRefresher refresher, MessageType messageType, IEnumerable ids = null, Type idType = null, string json = null) { var batch = GetBatch(true); var instructions = RefreshInstruction.GetInstructions(refresher, messageType, ids, idType, json); // batch if we can, else write to DB immediately if (batch == null) { //only write the json blob with a maximum count of the MaxProcessingInstructionCount using (var scope = ScopeProvider.CreateScope()) { foreach (var maxBatch in instructions.InGroupsOf(Options.MaxProcessingInstructionCount)) { WriteInstructions(scope, maxBatch); } scope.Complete(); } } else { batch.Add(new RefreshInstructionEnvelope(refresher, instructions)); } } } }