Refacetor BackgroundTaskRunner, fix async issues

This commit is contained in:
Stephan
2016-10-04 14:10:20 +02:00
parent 35c7fb60fd
commit 3fd918d381
11 changed files with 324 additions and 108 deletions

View File

@@ -1,9 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Web.Hosting;
using Umbraco.Core;
using Umbraco.Core.Events;
@@ -27,18 +25,21 @@ namespace Umbraco.Web.Scheduling
public class BackgroundTaskRunner<T> : BackgroundTaskRunner, IBackgroundTaskRunner<T>
where T : class, IBackgroundTask
{
// do not remove this comment!
//
// if you plan to do anything on this class, first go and read
// http://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
// http://stackoverflow.com/questions/19481964/calling-taskcompletionsource-setresult-in-a-non-blocking-manner
// http://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont
// and more, and more, and more
// and remember: async is hard
private readonly string _logPrefix;
private readonly BackgroundTaskRunnerOptions _options;
private readonly ILogger _logger;
private readonly object _locker = new object();
private readonly BlockingCollection<T> _tasks = new BlockingCollection<T>();
private IEnumerator<T> _enumerator;
// that event is used to stop the pump when it is alive and waiting
// on a latched task - so it waits on the latch, the cancellation token,
// and the completed event
private readonly ManualResetEventSlim _completedEvent = new ManualResetEventSlim(false);
private readonly BufferBlock<T> _tasks = new BufferBlock<T>(new DataflowBlockOptions { });
// in various places we are testing these vars outside a lock, so make them volatile
private volatile bool _isRunning; // is running
@@ -60,7 +61,7 @@ namespace Umbraco.Web.Scheduling
/// <param name="mainDomInstall">An optional action to execute when the main domain status is aquired.</param>
/// <param name="mainDomRelease">An optional action to execute when the main domain status is released.</param>
public BackgroundTaskRunner(ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
: this(typeof (T).FullName, new BackgroundTaskRunnerOptions(), logger, mainDomInstall, mainDomRelease)
: this(typeof(T).FullName, new BackgroundTaskRunnerOptions(), logger, mainDomInstall, mainDomRelease)
{ }
/// <summary>
@@ -82,7 +83,7 @@ namespace Umbraco.Web.Scheduling
/// <param name="mainDomInstall">An optional action to execute when the main domain status is aquired.</param>
/// <param name="mainDomRelease">An optional action to execute when the main domain status is released.</param>
public BackgroundTaskRunner(BackgroundTaskRunnerOptions options, ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
: this(typeof (T).FullName, options, logger, mainDomInstall, mainDomRelease)
: this(typeof(T).FullName, options, logger, mainDomInstall, mainDomRelease)
{ }
/// <summary>
@@ -173,7 +174,7 @@ namespace Umbraco.Web.Scheduling
/// <returns>An awaitable instance.</returns>
/// <remarks>Used to wait until the runner is no longer running (IsRunning == false),
/// though the runner could be started again afterwards by adding tasks to it.</remarks>
internal ThreadingTaskImmutable StoppedAwaitable
public ThreadingTaskImmutable StoppedAwaitable
{
get
{
@@ -222,8 +223,8 @@ namespace Umbraco.Web.Scheduling
throw new InvalidOperationException("The task runner has completed.");
// add task
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task added {0}", () => task.GetType().FullName);
_tasks.Post(task);
// start
StartUpLocked();
@@ -242,13 +243,13 @@ namespace Umbraco.Web.Scheduling
{
if (_completed)
{
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task cannot be added {0}, the task runner has already shutdown", task.GetType);
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task cannot be added {0}, the task runner has already shutdown", () => task.GetType().FullName);
return false;
}
// add task
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Task added {0}", () => task.GetType().FullName);
_tasks.Post(task);
// start
StartUpLocked();
@@ -303,9 +304,7 @@ namespace Umbraco.Web.Scheduling
// create a new token source since this is a new process
_shutdownTokenSource = new CancellationTokenSource();
_shutdownToken = _shutdownTokenSource.Token;
_enumerator = _options.KeepAlive ? _tasks.GetConsumingEnumerable(_shutdownToken).GetEnumerator() : null;
_runningTask = Task.Run(async () => await Pump(), _shutdownToken);
_runningTask = Task.Run(async () => await Pump().ConfigureAwait(false), _shutdownToken);
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Starting");
}
@@ -325,23 +324,22 @@ namespace Umbraco.Web.Scheduling
if (_isRunning == false) return; // done already
}
// try to be nice
// assuming multiple threads can do these without problems
_completedEvent.Set();
_tasks.CompleteAdding();
// complete the queue
// will stop waiting on the queue or on a latch
_tasks.Complete();
if (force)
{
// we must bring everything down, now
Thread.Sleep(100); // give time to CompleteAdding()
Thread.Sleep(100); // give time to Complete()
lock (_locker)
{
// was CompleteAdding() enough?
// was Complete() enough?
if (_isRunning == false) return;
}
// try to cancel running async tasks (cannot do much about sync tasks)
// break delayed tasks delay
// truncate running queues
// break latched tasks
// stop processing the queue
_shutdownTokenSource.Cancel(false); // false is the default
}
@@ -356,66 +354,64 @@ namespace Umbraco.Web.Scheduling
{
while (true)
{
var bgTask = GetNextBackgroundTask();
if (bgTask == null)
return;
// get the next task
// if it returns null the runner is going down, stop
var bgTask = await GetNextBackgroundTask(_shutdownToken);
if (bgTask == null) return;
// set a cancellation source so that the current task can be cancelled
// link from _shutdownToken so that we can use _cancelTokenSource for both
lock (_locker)
{
// set another one - for the next task
_cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_shutdownToken);
}
bgTask = WaitForLatch(bgTask, _cancelTokenSource.Token);
if (bgTask == null) return;
// wait for latch should return the task
// if it returns null it's either that the task has been cancelled
// or the whole runner is going down - in both cases, continue,
// and GetNextBackgroundTask will take care of shutdowns
bgTask = await WaitForLatch(bgTask, _cancelTokenSource.Token);
if (bgTask == null) continue;
// executes & be safe - RunAsync should NOT throw but only raise an event,
// but... just make sure we never ever take everything down
try
{
await RunAsync(bgTask, _cancelTokenSource.Token).ConfigureAwait(false);
}
catch (Exception e)
{
// RunAsync should NOT throw exception - just raise an event
// this is here for safety and to ensure we don't kill everything, ever
_logger.Error<BackgroundTaskRunner>(_logPrefix + "Task runner exception.", e);
}
_cancelTokenSource = null;
// done
lock (_locker)
{
_cancelTokenSource = null;
}
}
}
private T GetNextBackgroundTask()
// gets the next background task from the buffer
private async Task<T> GetNextBackgroundTask(CancellationToken token)
{
while (true)
{
// exit if cancelling
if (_shutdownToken.IsCancellationRequested == false)
{
// try to get a task
// the blocking MoveNext will end if token is cancelled or collection is completed
T bgTask;
var hasBgTask = _options.KeepAlive
? (bgTask = _enumerator.MoveNext() ? _enumerator.Current : null) != null // blocking
: _tasks.TryTake(out bgTask); // non-blocking
// exit if cancelling
if (_shutdownToken.IsCancellationRequested == false && hasBgTask)
return bgTask;
}
var task = await GetNextBackgroundTask2(token);
if (task != null) return task;
lock (_locker)
{
// deal with race condition
if (_shutdownToken.IsCancellationRequested == false && _tasks.Count > 0) continue;
// if we really have nothing to do, stop
_logger.Debug<BackgroundTaskRunner>(_logPrefix + "Stopping");
if (_options.PreserveRunningTask == false)
_runningTask = null;
_isRunning = false;
_shutdownToken = CancellationToken.None;
_enumerator = null;
}
OnEvent(Stopped, "Stopped");
@@ -423,32 +419,82 @@ namespace Umbraco.Web.Scheduling
}
}
private T WaitForLatch(T bgTask, CancellationToken token)
private async Task<T> GetNextBackgroundTask2(CancellationToken shutdownToken)
{
// exit if cancelling
if (shutdownToken.IsCancellationRequested)
return null;
// if keepalive is false then don't block, exit if there is
// no task in the buffer - yes, there is a race cond, which
// we'll take care of
if (_options.KeepAlive == false && _tasks.Count == 0)
return null;
try
{
// A Task<TResult> that informs of whether and when more output is available. If, when the
// task completes, its Result is true, more output is available in the source (though another
// consumer of the source may retrieve the data). If it returns false, more output is not
// and will never be available, due to the source completing prior to output being available.
var output = await _tasks.OutputAvailableAsync(shutdownToken); // block until output or cancelled
if (output == false) return null;
}
catch (TaskCanceledException)
{
return null;
}
try
{
// A task that represents the asynchronous receive operation. When an item value is successfully
// received from the source, the returned task is completed and its Result returns the received
// value. If an item value cannot be retrieved because the source is empty and completed, an
// InvalidOperationException exception is thrown in the returned task.
// the source cannot be empty *and* completed here - we know we have output
return await _tasks.ReceiveAsync(shutdownToken);
}
catch (TaskCanceledException)
{
return null;
}
}
// if bgTask is not a latched background task, or if it is not latched, returns immediately
// else waits for the latch, taking care of completion and shutdown and whatnot
private async Task<T> WaitForLatch(T bgTask, CancellationToken token)
{
var latched = bgTask as ILatchedBackgroundTask;
if (latched == null || latched.IsLatched == false) return bgTask;
// returns the array index of the object that satisfied the wait
var i = WaitHandle.WaitAny(new[] { latched.Latch, token.WaitHandle, _completedEvent.WaitHandle });
// support cancelling awaiting
// read https://github.com/dotnet/corefx/issues/2704
// read http://stackoverflow.com/questions/27238232/how-can-i-cancel-task-whenall
var tokenTaskSource = new TaskCompletionSource<bool>();
token.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tokenTaskSource);
switch (i)
{
case 0:
// ok to run now
return bgTask;
case 1:
// cancellation
return null;
case 2:
// termination
if (latched.RunsOnShutdown) return bgTask;
latched.Dispose();
return null;
default:
throw new Exception("panic.");
}
// returns the task that completed
// - latched.Latch completes when the latch releases
// - _tasks.Completion completes when the runner completes
// - tokenTaskSource.Task completes when this task, or the whole runner, is cancelled
var task = await Task.WhenAny(latched.Latch, _tasks.Completion, tokenTaskSource.Task);
// ok to run now
if (task == latched.Latch)
return bgTask;
// if shutting down, return the task only if it runs on shutdown
if (_shutdownToken.IsCancellationRequested == false && latched.RunsOnShutdown) return bgTask;
// else, either it does not run on shutdown or it's been cancelled, dispose
latched.Dispose();
return null;
}
// runs the background task, taking care of shutdown (as far as possible - cannot abort
// a non-async Run for example, so we'll do our best)
private async Task RunAsync(T bgTask, CancellationToken token)
{
try
@@ -483,7 +529,7 @@ namespace Umbraco.Web.Scheduling
catch (Exception ex)
{
_logger.Error<BackgroundTaskRunner>(_logPrefix + "Task has failed", ex);
}
}
}
#region Events
@@ -548,7 +594,7 @@ namespace Umbraco.Web.Scheduling
{
OnEvent(TaskCancelled, "TaskCancelled", e);
//dispose it
// dispose it
e.Task.Dispose();
}