diff --git a/src/Umbraco.Core/Sync/DatabaseServerMessenger.cs b/src/Umbraco.Core/Sync/DatabaseServerMessenger.cs index 4e46c0ab5c..e8d5287edc 100644 --- a/src/Umbraco.Core/Sync/DatabaseServerMessenger.cs +++ b/src/Umbraco.Core/Sync/DatabaseServerMessenger.cs @@ -115,7 +115,11 @@ namespace Umbraco.Core.Sync { _released = true; // no more syncs } - _syncIdle.WaitOne(); // wait for pending sync + + // Wait for pending sync this is waiting for _syncIdle.Set() + // to be called. Until that is called, the appdomain cannot shut down! + // so whatever is locking this currently should hurry up! + _syncIdle.WaitOne(); }, weight); @@ -197,13 +201,15 @@ namespace Umbraco.Core.Sync { if (_syncing) return; - + + //Don't continue if we are released if (_released) return; if ((DateTime.UtcNow - _lastSync).TotalSeconds <= _options.ThrottleSeconds) return; + //Set our flag and the lock to be in it's original state (i.e. it can be awaited) _syncing = true; _syncIdle.Reset(); _lastSync = DateTime.UtcNow; @@ -215,6 +221,7 @@ namespace Umbraco.Core.Sync { ProcessDatabaseInstructions(); + //Check for pruning throttling if ((DateTime.UtcNow - _lastPruned).TotalSeconds <= _options.PruneThrottleSeconds) return; @@ -231,6 +238,7 @@ namespace Umbraco.Core.Sync } finally { + //We must reset our flag and signal any waiting locks _syncing = false; _syncIdle.Set(); } @@ -255,13 +263,14 @@ namespace Umbraco.Core.Sync // // FIXME not true if we're running on a background thread, assuming we can? + var sql = new Sql().Select("*") .From(_appContext.DatabaseContext.SqlSyntax) .Where(dto => dto.Id > _lastId) .OrderBy(dto => dto.Id, _appContext.DatabaseContext.SqlSyntax); - var dtos = _appContext.DatabaseContext.Database.Fetch(sql); - if (dtos.Count <= 0) return; + //only retrieve the max (just in case there's tons) + var topSql = _appContext.DatabaseContext.SqlSyntax.SelectTop(sql, _options.MaxProcessingInstructionCount); // only process instructions coming from a remote server, and ignore instructions coming from // the local server as they've already been processed. We should NOT assume that the sequence of @@ -269,7 +278,16 @@ namespace Umbraco.Core.Sync var localIdentity = LocalIdentity; var lastId = 0; - foreach (var dto in dtos) + + //this is used to determine if we need to exit the reader loop below because there are actually + // too many instructions to process. In which case we need to exit the reader so we can actually re-save + // the remaining instructions back to the same row (we cannot save something while inside a reader loop) + Tuple> maxInstructions = null; + + //IMPORTANT! We are doing a Query here instead of a Fetch, this means that it will open a data reader + // which we are iterating over instead of loading everything into memory and iterating over that. + // When doing this we always must use a for loop so that the Enumerator is disposed and the reader is closed. + foreach (var dto in _appContext.DatabaseContext.Database.Query(topSql)) { if (dto.OriginIdentity == localIdentity) { @@ -291,27 +309,73 @@ namespace Umbraco.Core.Sync continue; } - // execute remote instructions & update lastId - try - { - NotifyRefreshers(jsonA); - lastId = dto.Id; - } - catch (Exception ex) - { - _logger.Error( - string.Format("DISTRIBUTED CACHE IS NOT UPDATED. Failed to execute instructions ({0}: \"{1}\"). Instruction is being skipped/ignored", dto.Id, dto.Instructions), ex); + var instructionBatch = GetAllInstructions(jsonA); - //we cannot throw here because this invalid instruction will just keep getting processed over and over and errors - // will be thrown over and over. The only thing we can do is ignore and move on. - lastId = dto.Id; + // Here we should check if there's too many instructions, if there is we should split them and re-save the instructions entry with + // the trimmed instructions. We then don't update the lastsynced value so that this row is re-processed again but with only the remaining + // instructions in it. + if (instructionBatch.Count > _options.MaxProcessingInstructionCount) + { + maxInstructions = new Tuple>(dto, instructionBatch); + break; } + + //process as per-normal + lastId = ProcessDatabaseInstructions(instructionBatch, dto); + } + + //If this is not null this means we've found a row that has a ton of instructions in it and we'll need to process + // just a part of it and then re-save the remaining to the same row so that another request can deal with the data. + if (maxInstructions != null) + { + var remainingCount = maxInstructions.Item2.Count - _options.MaxProcessingInstructionCount; + + _logger.Info( + "Max processing instruction count reached. This batch will be processed now but the remaining {0} will be processed by subsequent requests.", () => remainingCount); + + var processingBatch = maxInstructions.Item2.GetRange(0, _options.MaxProcessingInstructionCount); + //NOTE: We are not persisting the lastId from the result of this method because we will need to re-process it + ProcessDatabaseInstructions(processingBatch, maxInstructions.Item1); + + //Save the instruction blob back to the DB with the trimmed instruction count + var remaining = maxInstructions.Item2.GetRange(_options.MaxProcessingInstructionCount - 1, remainingCount); + maxInstructions.Item1.UtcStamp = DateTime.UtcNow; + //serialize the remaining instructions (leave the original identity as-is) + maxInstructions.Item1.Instructions = JsonConvert.SerializeObject(remaining, Formatting.None); + ApplicationContext.DatabaseContext.Database.Update(maxInstructions.Item1); } if (lastId > 0) SaveLastSynced(lastId); } + private int ProcessDatabaseInstructions(List instructionBatch, CacheInstructionDto dto) + { + // execute remote instructions & update lastId + try + { + NotifyRefreshers(instructionBatch); + return dto.Id; + } + //catch (ThreadAbortException ex) + //{ + // //This will occur if the instructions processing is taking too long since this is occuring on a request thread. + // // Or possibly if IIS terminates the appdomain. In any case, we should deal with this differently perhaps... + //} + catch (Exception ex) + { + _logger.Error( + string.Format("DISTRIBUTED CACHE IS NOT UPDATED. Failed to execute instructions (id: {0}, instruction count: {1}). Instruction is being skipped/ignored", dto.Id, instructionBatch.Count), ex); + + //we cannot throw here because this invalid instruction will just keep getting processed over and over and errors + // will be thrown over and over. The only thing we can do is ignore and move on. + return dto.Id; + } + + ////if this is returned it will not be saved + //return -1; + } + /// /// Remove old instructions from the database /// @@ -459,8 +523,14 @@ namespace Umbraco.Core.Sync return jsonRefresher; } - private static void NotifyRefreshers(IEnumerable jsonArray) + /// + /// Parses out the individual instructions to be processed + /// + /// + /// + private static List GetAllInstructions(IEnumerable jsonArray) { + var result = new List(); foreach (var jsonItem in jsonArray) { // could be a JObject in which case we can convert to a RefreshInstruction, @@ -469,33 +539,45 @@ namespace Umbraco.Core.Sync if (jsonObj != null) { var instruction = jsonObj.ToObject(); - switch (instruction.RefreshType) - { - case RefreshMethodType.RefreshAll: - RefreshAll(instruction.RefresherId); - break; - case RefreshMethodType.RefreshByGuid: - RefreshByGuid(instruction.RefresherId, instruction.GuidId); - break; - case RefreshMethodType.RefreshById: - RefreshById(instruction.RefresherId, instruction.IntId); - break; - case RefreshMethodType.RefreshByIds: - RefreshByIds(instruction.RefresherId, instruction.JsonIds); - break; - case RefreshMethodType.RefreshByJson: - RefreshByJson(instruction.RefresherId, instruction.JsonPayload); - break; - case RefreshMethodType.RemoveById: - RemoveById(instruction.RefresherId, instruction.IntId); - break; - } - + result.Add(instruction); } else { - var jsonInnerArray = (JArray) jsonItem; - NotifyRefreshers(jsonInnerArray); // recurse + var jsonInnerArray = (JArray)jsonItem; + result.AddRange(GetAllInstructions(jsonInnerArray)); // recurse + } + } + return result; + } + + /// + /// executes the instructions against the cache refresher instances + /// + /// + private static void NotifyRefreshers(IEnumerable instructions) + { + foreach (var instruction in instructions) + { + switch (instruction.RefreshType) + { + case RefreshMethodType.RefreshAll: + RefreshAll(instruction.RefresherId); + break; + case RefreshMethodType.RefreshByGuid: + RefreshByGuid(instruction.RefresherId, instruction.GuidId); + break; + case RefreshMethodType.RefreshById: + RefreshById(instruction.RefresherId, instruction.IntId); + break; + case RefreshMethodType.RefreshByIds: + RefreshByIds(instruction.RefresherId, instruction.JsonIds); + break; + case RefreshMethodType.RefreshByJson: + RefreshByJson(instruction.RefresherId, instruction.JsonPayload); + break; + case RefreshMethodType.RemoveById: + RemoveById(instruction.RefresherId, instruction.IntId); + break; } } }