This changes the instruction fetch process to query for the top records based on the MaxProcessingInstructionCount. This changes the Fetch to be a Query so they are not all loaded into memory and instead uses a db reader. This checks if any JSON blob contains more than the MaxProcessingInstructionCount and if so, it breaks out of the reader, takes the max amount to be processed and re-saves the remaining back to the same record so that this single request does not over-process instructions (which can take a long time and cause all sorts of problems)

This commit is contained in:
Shannon
2016-10-25 15:50:33 +02:00
parent 74a5bf5503
commit d7d4b26a66

View File

@@ -115,7 +115,11 @@ namespace Umbraco.Core.Sync
{
_released = true; // no more syncs
}
_syncIdle.WaitOne(); // wait for pending sync
// Wait for pending sync this is waiting for _syncIdle.Set()
// to be called. Until that is called, the appdomain cannot shut down!
// so whatever is locking this currently should hurry up!
_syncIdle.WaitOne();
},
weight);
@@ -197,13 +201,15 @@ namespace Umbraco.Core.Sync
{
if (_syncing)
return;
//Don't continue if we are released
if (_released)
return;
if ((DateTime.UtcNow - _lastSync).TotalSeconds <= _options.ThrottleSeconds)
return;
//Set our flag and the lock to be in it's original state (i.e. it can be awaited)
_syncing = true;
_syncIdle.Reset();
_lastSync = DateTime.UtcNow;
@@ -215,6 +221,7 @@ namespace Umbraco.Core.Sync
{
ProcessDatabaseInstructions();
//Check for pruning throttling
if ((DateTime.UtcNow - _lastPruned).TotalSeconds <= _options.PruneThrottleSeconds)
return;
@@ -231,6 +238,7 @@ namespace Umbraco.Core.Sync
}
finally
{
//We must reset our flag and signal any waiting locks
_syncing = false;
_syncIdle.Set();
}
@@ -255,13 +263,14 @@ namespace Umbraco.Core.Sync
//
// FIXME not true if we're running on a background thread, assuming we can?
var sql = new Sql().Select("*")
.From<CacheInstructionDto>(_appContext.DatabaseContext.SqlSyntax)
.Where<CacheInstructionDto>(dto => dto.Id > _lastId)
.OrderBy<CacheInstructionDto>(dto => dto.Id, _appContext.DatabaseContext.SqlSyntax);
var dtos = _appContext.DatabaseContext.Database.Fetch<CacheInstructionDto>(sql);
if (dtos.Count <= 0) return;
//only retrieve the max (just in case there's tons)
var topSql = _appContext.DatabaseContext.SqlSyntax.SelectTop(sql, _options.MaxProcessingInstructionCount);
// only process instructions coming from a remote server, and ignore instructions coming from
// the local server as they've already been processed. We should NOT assume that the sequence of
@@ -269,7 +278,16 @@ namespace Umbraco.Core.Sync
var localIdentity = LocalIdentity;
var lastId = 0;
foreach (var dto in dtos)
//this is used to determine if we need to exit the reader loop below because there are actually
// too many instructions to process. In which case we need to exit the reader so we can actually re-save
// the remaining instructions back to the same row (we cannot save something while inside a reader loop)
Tuple<CacheInstructionDto, List<RefreshInstruction>> maxInstructions = null;
//IMPORTANT! We are doing a Query here instead of a Fetch, this means that it will open a data reader
// which we are iterating over instead of loading everything into memory and iterating over that.
// When doing this we always must use a for loop so that the Enumerator is disposed and the reader is closed.
foreach (var dto in _appContext.DatabaseContext.Database.Query<CacheInstructionDto>(topSql))
{
if (dto.OriginIdentity == localIdentity)
{
@@ -291,27 +309,73 @@ namespace Umbraco.Core.Sync
continue;
}
// execute remote instructions & update lastId
try
{
NotifyRefreshers(jsonA);
lastId = dto.Id;
}
catch (Exception ex)
{
_logger.Error<DatabaseServerMessenger>(
string.Format("DISTRIBUTED CACHE IS NOT UPDATED. Failed to execute instructions ({0}: \"{1}\"). Instruction is being skipped/ignored", dto.Id, dto.Instructions), ex);
var instructionBatch = GetAllInstructions(jsonA);
//we cannot throw here because this invalid instruction will just keep getting processed over and over and errors
// will be thrown over and over. The only thing we can do is ignore and move on.
lastId = dto.Id;
// Here we should check if there's too many instructions, if there is we should split them and re-save the instructions entry with
// the trimmed instructions. We then don't update the lastsynced value so that this row is re-processed again but with only the remaining
// instructions in it.
if (instructionBatch.Count > _options.MaxProcessingInstructionCount)
{
maxInstructions = new Tuple<CacheInstructionDto, List<RefreshInstruction>>(dto, instructionBatch);
break;
}
//process as per-normal
lastId = ProcessDatabaseInstructions(instructionBatch, dto);
}
//If this is not null this means we've found a row that has a ton of instructions in it and we'll need to process
// just a part of it and then re-save the remaining to the same row so that another request can deal with the data.
if (maxInstructions != null)
{
var remainingCount = maxInstructions.Item2.Count - _options.MaxProcessingInstructionCount;
_logger.Info<DatabaseServerMessenger>(
"Max processing instruction count reached. This batch will be processed now but the remaining {0} will be processed by subsequent requests.", () => remainingCount);
var processingBatch = maxInstructions.Item2.GetRange(0, _options.MaxProcessingInstructionCount);
//NOTE: We are not persisting the lastId from the result of this method because we will need to re-process it
ProcessDatabaseInstructions(processingBatch, maxInstructions.Item1);
//Save the instruction blob back to the DB with the trimmed instruction count
var remaining = maxInstructions.Item2.GetRange(_options.MaxProcessingInstructionCount - 1, remainingCount);
maxInstructions.Item1.UtcStamp = DateTime.UtcNow;
//serialize the remaining instructions (leave the original identity as-is)
maxInstructions.Item1.Instructions = JsonConvert.SerializeObject(remaining, Formatting.None);
ApplicationContext.DatabaseContext.Database.Update(maxInstructions.Item1);
}
if (lastId > 0)
SaveLastSynced(lastId);
}
private int ProcessDatabaseInstructions(List<RefreshInstruction> instructionBatch, CacheInstructionDto dto)
{
// execute remote instructions & update lastId
try
{
NotifyRefreshers(instructionBatch);
return dto.Id;
}
//catch (ThreadAbortException ex)
//{
// //This will occur if the instructions processing is taking too long since this is occuring on a request thread.
// // Or possibly if IIS terminates the appdomain. In any case, we should deal with this differently perhaps...
//}
catch (Exception ex)
{
_logger.Error<DatabaseServerMessenger>(
string.Format("DISTRIBUTED CACHE IS NOT UPDATED. Failed to execute instructions (id: {0}, instruction count: {1}). Instruction is being skipped/ignored", dto.Id, instructionBatch.Count), ex);
//we cannot throw here because this invalid instruction will just keep getting processed over and over and errors
// will be thrown over and over. The only thing we can do is ignore and move on.
return dto.Id;
}
////if this is returned it will not be saved
//return -1;
}
/// <summary>
/// Remove old instructions from the database
/// </summary>
@@ -459,8 +523,14 @@ namespace Umbraco.Core.Sync
return jsonRefresher;
}
private static void NotifyRefreshers(IEnumerable<JToken> jsonArray)
/// <summary>
/// Parses out the individual instructions to be processed
/// </summary>
/// <param name="jsonArray"></param>
/// <returns></returns>
private static List<RefreshInstruction> GetAllInstructions(IEnumerable<JToken> jsonArray)
{
var result = new List<RefreshInstruction>();
foreach (var jsonItem in jsonArray)
{
// could be a JObject in which case we can convert to a RefreshInstruction,
@@ -469,33 +539,45 @@ namespace Umbraco.Core.Sync
if (jsonObj != null)
{
var instruction = jsonObj.ToObject<RefreshInstruction>();
switch (instruction.RefreshType)
{
case RefreshMethodType.RefreshAll:
RefreshAll(instruction.RefresherId);
break;
case RefreshMethodType.RefreshByGuid:
RefreshByGuid(instruction.RefresherId, instruction.GuidId);
break;
case RefreshMethodType.RefreshById:
RefreshById(instruction.RefresherId, instruction.IntId);
break;
case RefreshMethodType.RefreshByIds:
RefreshByIds(instruction.RefresherId, instruction.JsonIds);
break;
case RefreshMethodType.RefreshByJson:
RefreshByJson(instruction.RefresherId, instruction.JsonPayload);
break;
case RefreshMethodType.RemoveById:
RemoveById(instruction.RefresherId, instruction.IntId);
break;
}
result.Add(instruction);
}
else
{
var jsonInnerArray = (JArray) jsonItem;
NotifyRefreshers(jsonInnerArray); // recurse
var jsonInnerArray = (JArray)jsonItem;
result.AddRange(GetAllInstructions(jsonInnerArray)); // recurse
}
}
return result;
}
/// <summary>
/// executes the instructions against the cache refresher instances
/// </summary>
/// <param name="instructions"></param>
private static void NotifyRefreshers(IEnumerable<RefreshInstruction> instructions)
{
foreach (var instruction in instructions)
{
switch (instruction.RefreshType)
{
case RefreshMethodType.RefreshAll:
RefreshAll(instruction.RefresherId);
break;
case RefreshMethodType.RefreshByGuid:
RefreshByGuid(instruction.RefresherId, instruction.GuidId);
break;
case RefreshMethodType.RefreshById:
RefreshById(instruction.RefresherId, instruction.IntId);
break;
case RefreshMethodType.RefreshByIds:
RefreshByIds(instruction.RefresherId, instruction.JsonIds);
break;
case RefreshMethodType.RefreshByJson:
RefreshByJson(instruction.RefresherId, instruction.JsonPayload);
break;
case RefreshMethodType.RemoveById:
RemoveById(instruction.RefresherId, instruction.IntId);
break;
}
}
}