Ensures that any given JSON blob written to the instructions can only contain the maximum number of instructions to process
This commit is contained in:
@@ -28,8 +28,7 @@ namespace Umbraco.Core.Sync
|
||||
//
|
||||
public class DatabaseServerMessenger : ServerMessengerBase
|
||||
{
|
||||
private readonly ApplicationContext _appContext;
|
||||
private readonly DatabaseServerMessengerOptions _options;
|
||||
private readonly ApplicationContext _appContext;
|
||||
private readonly ManualResetEvent _syncIdle;
|
||||
private readonly object _locko = new object();
|
||||
private readonly ILogger _logger;
|
||||
@@ -41,6 +40,7 @@ namespace Umbraco.Core.Sync
|
||||
private bool _released;
|
||||
private readonly ProfilingLogger _profilingLogger;
|
||||
|
||||
protected DatabaseServerMessengerOptions Options { get; private set; }
|
||||
protected ApplicationContext ApplicationContext { get { return _appContext; } }
|
||||
|
||||
public DatabaseServerMessenger(ApplicationContext appContext, bool distributedEnabled, DatabaseServerMessengerOptions options)
|
||||
@@ -50,7 +50,7 @@ namespace Umbraco.Core.Sync
|
||||
if (options == null) throw new ArgumentNullException("options");
|
||||
|
||||
_appContext = appContext;
|
||||
_options = options;
|
||||
Options = options;
|
||||
_lastPruned = _lastSync = DateTime.UtcNow;
|
||||
_syncIdle = new ManualResetEvent(true);
|
||||
_profilingLogger = appContext.ProfilingLogger;
|
||||
@@ -159,13 +159,13 @@ namespace Umbraco.Core.Sync
|
||||
{
|
||||
//check for how many instructions there are to process
|
||||
var count = _appContext.DatabaseContext.Database.ExecuteScalar<int>("SELECT COUNT(*) FROM umbracoCacheInstruction WHERE id > @lastId", new {lastId = _lastId});
|
||||
if (count > _options.MaxProcessingInstructionCount)
|
||||
if (count > Options.MaxProcessingInstructionCount)
|
||||
{
|
||||
//too many instructions, proceed to cold boot
|
||||
_logger.Warn<DatabaseServerMessenger>("The instruction count ({0}) exceeds the specified MaxProcessingInstructionCount ({1})."
|
||||
+ " The server will skip existing instructions, rebuild its caches and indexes entirely, adjust its last synced Id"
|
||||
+ " to the latest found in the database and maintain cache updates based on that Id.",
|
||||
() => count, () => _options.MaxProcessingInstructionCount);
|
||||
() => count, () => Options.MaxProcessingInstructionCount);
|
||||
|
||||
coldboot = true;
|
||||
}
|
||||
@@ -183,8 +183,8 @@ namespace Umbraco.Core.Sync
|
||||
SaveLastSynced(maxId);
|
||||
|
||||
// execute initializing callbacks
|
||||
if (_options.InitializingCallbacks != null)
|
||||
foreach (var callback in _options.InitializingCallbacks)
|
||||
if (Options.InitializingCallbacks != null)
|
||||
foreach (var callback in Options.InitializingCallbacks)
|
||||
callback();
|
||||
}
|
||||
|
||||
@@ -206,7 +206,7 @@ namespace Umbraco.Core.Sync
|
||||
if (_released)
|
||||
return;
|
||||
|
||||
if ((DateTime.UtcNow - _lastSync).TotalSeconds <= _options.ThrottleSeconds)
|
||||
if ((DateTime.UtcNow - _lastSync).TotalSeconds <= Options.ThrottleSeconds)
|
||||
return;
|
||||
|
||||
//Set our flag and the lock to be in it's original state (i.e. it can be awaited)
|
||||
@@ -222,7 +222,7 @@ namespace Umbraco.Core.Sync
|
||||
ProcessDatabaseInstructions();
|
||||
|
||||
//Check for pruning throttling
|
||||
if ((DateTime.UtcNow - _lastPruned).TotalSeconds <= _options.PruneThrottleSeconds)
|
||||
if ((DateTime.UtcNow - _lastPruned).TotalSeconds <= Options.PruneThrottleSeconds)
|
||||
return;
|
||||
|
||||
_lastPruned = _lastSync;
|
||||
@@ -270,7 +270,7 @@ namespace Umbraco.Core.Sync
|
||||
.OrderBy<CacheInstructionDto>(dto => dto.Id, _appContext.DatabaseContext.SqlSyntax);
|
||||
|
||||
//only retrieve the max (just in case there's tons)
|
||||
var topSql = _appContext.DatabaseContext.SqlSyntax.SelectTop(sql, _options.MaxProcessingInstructionCount);
|
||||
var topSql = _appContext.DatabaseContext.SqlSyntax.SelectTop(sql, Options.MaxProcessingInstructionCount);
|
||||
|
||||
// only process instructions coming from a remote server, and ignore instructions coming from
|
||||
// the local server as they've already been processed. We should NOT assume that the sequence of
|
||||
@@ -314,7 +314,7 @@ namespace Umbraco.Core.Sync
|
||||
// Here we should check if there's too many instructions, if there is we should split them and re-save the instructions entry with
|
||||
// the trimmed instructions. We then don't update the lastsynced value so that this row is re-processed again but with only the remaining
|
||||
// instructions in it.
|
||||
if (instructionBatch.Count > _options.MaxProcessingInstructionCount)
|
||||
if (instructionBatch.Count > Options.MaxProcessingInstructionCount)
|
||||
{
|
||||
maxInstructions = new Tuple<CacheInstructionDto, List<RefreshInstruction>>(dto, instructionBatch);
|
||||
break;
|
||||
@@ -328,17 +328,17 @@ namespace Umbraco.Core.Sync
|
||||
// just a part of it and then re-save the remaining to the same row so that another request can deal with the data.
|
||||
if (maxInstructions != null)
|
||||
{
|
||||
var remainingCount = maxInstructions.Item2.Count - _options.MaxProcessingInstructionCount;
|
||||
var remainingCount = maxInstructions.Item2.Count - Options.MaxProcessingInstructionCount;
|
||||
|
||||
_logger.Info<DatabaseServerMessenger>(
|
||||
"Max processing instruction count reached. This batch will be processed now but the remaining {0} will be processed by subsequent requests.", () => remainingCount);
|
||||
|
||||
var processingBatch = maxInstructions.Item2.GetRange(0, _options.MaxProcessingInstructionCount);
|
||||
var processingBatch = maxInstructions.Item2.GetRange(0, Options.MaxProcessingInstructionCount);
|
||||
//NOTE: We are not persisting the lastId from the result of this method because we will need to re-process it
|
||||
ProcessDatabaseInstructions(processingBatch, maxInstructions.Item1);
|
||||
|
||||
//Save the instruction blob back to the DB with the trimmed instruction count
|
||||
var remaining = maxInstructions.Item2.GetRange(_options.MaxProcessingInstructionCount - 1, remainingCount);
|
||||
var remaining = maxInstructions.Item2.GetRange(Options.MaxProcessingInstructionCount - 1, remainingCount);
|
||||
maxInstructions.Item1.UtcStamp = DateTime.UtcNow;
|
||||
//serialize the remaining instructions (leave the original identity as-is)
|
||||
maxInstructions.Item1.Instructions = JsonConvert.SerializeObject(remaining, Formatting.None);
|
||||
@@ -386,7 +386,7 @@ namespace Umbraco.Core.Sync
|
||||
/// </remarks>
|
||||
private void PruneOldInstructions()
|
||||
{
|
||||
var pruneDate = DateTime.UtcNow.AddDays(-_options.DaysToRetainInstructions);
|
||||
var pruneDate = DateTime.UtcNow.AddDays(-Options.DaysToRetainInstructions);
|
||||
|
||||
// using 2 queries is faster than convoluted joins
|
||||
|
||||
|
||||
@@ -84,11 +84,16 @@ namespace Umbraco.Web
|
||||
|
||||
var instructions = batch.SelectMany(x => x.Instructions).ToArray();
|
||||
batch.Clear();
|
||||
if (instructions.Length == 0) return;
|
||||
WriteInstructions(instructions);
|
||||
|
||||
//Write the instructions but only create JSON blobs with a max instruction count equal to MaxProcessingInstructionCount
|
||||
foreach (var instructionsBatch in instructions.InGroupsOf(Options.MaxProcessingInstructionCount))
|
||||
{
|
||||
WriteInstructions(instructionsBatch);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void WriteInstructions(RefreshInstruction[] instructions)
|
||||
private void WriteInstructions(IEnumerable<RefreshInstruction> instructions)
|
||||
{
|
||||
var dto = new CacheInstructionDto
|
||||
{
|
||||
@@ -136,9 +141,18 @@ namespace Umbraco.Web
|
||||
|
||||
// batch if we can, else write to DB immediately
|
||||
if (batch == null)
|
||||
WriteInstructions(instructions.ToArray());
|
||||
{
|
||||
//only write the json blob with a maximum count of the MaxProcessingInstructionCount
|
||||
foreach (var maxBatch in instructions.InGroupsOf(Options.MaxProcessingInstructionCount))
|
||||
{
|
||||
WriteInstructions(maxBatch);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
batch.Add(new RefreshInstructionEnvelope(servers, refresher, instructions));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user