diff --git a/src/Umbraco.Tests/Scheduling/BackgroundTaskRunnerTests.cs b/src/Umbraco.Tests/Scheduling/BackgroundTaskRunnerTests.cs index be681c9e72..db060ddc79 100644 --- a/src/Umbraco.Tests/Scheduling/BackgroundTaskRunnerTests.cs +++ b/src/Umbraco.Tests/Scheduling/BackgroundTaskRunnerTests.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using NUnit.Framework; using Umbraco.Core; using Umbraco.Core.Logging; -using Umbraco.Tests.TestHelpers; using Umbraco.Web.Scheduling; namespace Umbraco.Tests.Scheduling @@ -16,7 +15,7 @@ namespace Umbraco.Tests.Scheduling [Timeout(30000)] public class BackgroundTaskRunnerTests { - ILogger _logger; + private ILogger _logger; [TestFixtureSetUp] public void InitializeFixture() @@ -552,29 +551,29 @@ namespace Umbraco.Tests.Scheduling } [Test] - public async void DelayedTaskRuns() + public async void LatchedTaskRuns() { using (var runner = new BackgroundTaskRunner(new BackgroundTaskRunnerOptions(), _logger)) { - var task = new MyDelayedTask(200, false); + var task = new MyLatchedTask(200, false); runner.Add(task); Assert.IsTrue(runner.IsRunning); - Thread.Sleep(5000); + Thread.Sleep(1000); Assert.IsTrue(runner.IsRunning); // still waiting for the task to release Assert.IsFalse(task.HasRun); task.Release(); - await runner.CurrentThreadingTask; //wait for current task to complete + await runner.CurrentThreadingTask; // wait for current task to complete Assert.IsTrue(task.HasRun); await runner.StoppedAwaitable; // wait for the entire runner operation to complete } } [Test] - public async void DelayedTaskStops() + public async void LatchedTaskStops() { using (var runner = new BackgroundTaskRunner(new BackgroundTaskRunnerOptions(), _logger)) { - var task = new MyDelayedTask(200, true); + var task = new MyLatchedTask(200, true); runner.Add(task); Assert.IsTrue(runner.IsRunning); Thread.Sleep(5000); @@ -588,7 +587,7 @@ namespace Umbraco.Tests.Scheduling [Test] - public void DelayedRecurring() + public void LatchedRecurring() { var runCount = 0; var waitHandle = new ManualResetEvent(false); @@ -662,7 +661,6 @@ namespace Umbraco.Tests.Scheduling runner.Add(task); Assert.IsTrue(runner.IsRunning); await runner.StoppedAwaitable; // wait for the entire runner operation to complete - Assert.AreEqual(1, exceptions.Count); // traced and reported } } @@ -684,6 +682,38 @@ namespace Umbraco.Tests.Scheduling } } + [Test] + public async void CancelAsyncTask() + { + using (var runner = new BackgroundTaskRunner(new BackgroundTaskRunnerOptions(), _logger)) + { + var task = new MyAsyncTask(4000); + runner.Add(task); + Assert.IsTrue(runner.IsRunning); + await Task.Delay(1000); // ensure the task *has* started else cannot cancel + runner.CancelCurrentBackgroundTask(); + + await runner.StoppedAwaitable; // wait for the entire runner operation to complete + Assert.AreEqual(default(DateTime), task.Ended); + } + } + + [Test] + public async void CancelLatchedTask() + { + using (var runner = new BackgroundTaskRunner(new BackgroundTaskRunnerOptions(), _logger)) + { + var task = new MyLatchedTask(4000, false); + runner.Add(task); + Assert.IsTrue(runner.IsRunning); + await Task.Delay(1000); // ensure the task *has* started else cannot cancel + runner.CancelCurrentBackgroundTask(); + + await runner.StoppedAwaitable; // wait for the entire runner operation to complete + Assert.IsFalse(task.HasRun); + } + } + private class MyFailingTask : IBackgroundTask { private readonly bool _isAsync; @@ -706,7 +736,7 @@ namespace Umbraco.Tests.Scheduling public async Task RunAsync(CancellationToken token) { - await Task.Delay(1000); + await Task.Delay(1000, token); if (_running) throw new Exception("Task has thrown."); } @@ -750,28 +780,28 @@ namespace Umbraco.Tests.Scheduling public override bool RunsOnShutdown { get { return true; } } } - private class MyDelayedTask : ILatchedBackgroundTask + private class MyLatchedTask : ILatchedBackgroundTask { private readonly int _runMilliseconds; - private readonly ManualResetEventSlim _gate; + private readonly ManualResetEventSlim _latch; public bool HasRun { get; private set; } - public MyDelayedTask(int runMilliseconds, bool runsOnShutdown) + public MyLatchedTask(int runMilliseconds, bool runsOnShutdown) { _runMilliseconds = runMilliseconds; - _gate = new ManualResetEventSlim(false); + _latch = new ManualResetEventSlim(false); RunsOnShutdown = runsOnShutdown; } public WaitHandle Latch { - get { return _gate.WaitHandle; } + get { return _latch.WaitHandle; } } public bool IsLatched { - get { return _gate.IsSet == false; } + get { return _latch.IsSet == false; } } public bool RunsOnShutdown { get; private set; } @@ -784,7 +814,7 @@ namespace Umbraco.Tests.Scheduling public void Release() { - _gate.Set(); + _latch.Set(); } public Task RunAsync(CancellationToken token) @@ -849,6 +879,36 @@ namespace Umbraco.Tests.Scheduling } } + private class MyAsyncTask : BaseTask + { + private readonly int _milliseconds; + + public MyAsyncTask() + : this(500) + { } + + public MyAsyncTask(int milliseconds) + { + _milliseconds = milliseconds; + } + + public override void PerformRun() + { + throw new NotImplementedException(); + } + + public override async Task RunAsync(CancellationToken token) + { + await Task.Delay(_milliseconds, token); + Ended = DateTime.Now; + } + + public override bool IsAsync + { + get { return true; } + } + } + public abstract class BaseTask : IBackgroundTask { public bool WasCancelled { get; set; } @@ -863,13 +923,13 @@ namespace Umbraco.Tests.Scheduling Ended = DateTime.Now; } - public Task RunAsync(CancellationToken token) + public virtual Task RunAsync(CancellationToken token) { throw new NotImplementedException(); //return Task.Delay(500); } - public bool IsAsync + public virtual bool IsAsync { get { return false; } } diff --git a/src/Umbraco.Web/Scheduling/BackgroundTaskRunner.cs b/src/Umbraco.Web/Scheduling/BackgroundTaskRunner.cs index 81487034e1..fa878c3f79 100644 --- a/src/Umbraco.Web/Scheduling/BackgroundTaskRunner.cs +++ b/src/Umbraco.Web/Scheduling/BackgroundTaskRunner.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Web.Hosting; @@ -28,9 +30,11 @@ namespace Umbraco.Web.Scheduling private readonly string _logPrefix; private readonly BackgroundTaskRunnerOptions _options; private readonly ILogger _logger; - private readonly BlockingCollection _tasks = new BlockingCollection(); private readonly object _locker = new object(); + private readonly BlockingCollection _tasks = new BlockingCollection(); + private IEnumerator _enumerator; + // that event is used to stop the pump when it is alive and waiting // on a latched task - so it waits on the latch, the cancellation token, // and the completed event @@ -38,28 +42,16 @@ namespace Umbraco.Web.Scheduling // in various places we are testing these vars outside a lock, so make them volatile private volatile bool _isRunning; // is running - private volatile bool _isCompleted; // does not accept tasks anymore, may still be running + private volatile bool _completed; // does not accept tasks anymore, may still be running - private Task _runningTask; - private CancellationTokenSource _tokenSource; + private Task _runningTask; // the threading task that is currently executing background tasks + private CancellationTokenSource _shutdownTokenSource; // used to cancel everything and shutdown + private CancellationTokenSource _cancelTokenSource; // used to cancel the current task + private CancellationToken _shutdownToken; private bool _terminating; // ensures we raise that event only once private bool _terminated; // remember we've terminated - private TaskCompletionSource _terminatedSource; // awaitable source - - public event TypedEventHandler, TaskEventArgs> TaskError; - public event TypedEventHandler, TaskEventArgs> TaskStarting; - public event TypedEventHandler, TaskEventArgs> TaskCompleted; - public event TypedEventHandler, TaskEventArgs> TaskCancelled; - - // triggers when the runner stops (but could start again if a task is added to it) - internal event TypedEventHandler, EventArgs> Stopped; - - // triggers when the hosting environment requests that the runner terminates - internal event TypedEventHandler, EventArgs> Terminating; - - // triggers when the runner terminates (no task can be added, no task is running) - internal event TypedEventHandler, EventArgs> Terminated; + private TaskCompletionSource _terminatedSource; // enable awaiting termination /// /// Initializes a new instance of the class. @@ -118,7 +110,7 @@ namespace Umbraco.Web.Scheduling var mainDom = appContext == null ? null : appContext.MainDom; var reg = mainDom == null || ApplicationContext.Current.MainDom.Register(mainDomInstall, mainDomRelease); if (reg == false) - _isCompleted = _terminated = true; + _completed = _terminated = true; if (reg && mainDom == null && mainDomInstall != null) mainDomInstall(); } @@ -136,7 +128,7 @@ namespace Umbraco.Web.Scheduling } /// - /// Gets a value indicating whether a task is currently running. + /// Gets a value indicating whether a threading task is currently running. /// public bool IsRunning { @@ -148,38 +140,40 @@ namespace Umbraco.Web.Scheduling /// public bool IsCompleted { - get { return _isCompleted; } + get { return _completed; } } /// - /// Gets the running task as an immutable object. + /// Gets the running threading task as an immutable awaitable. /// /// There is no running task. /// - /// Unless the AutoStart option is true, there will be no running task until - /// a background task is added to the queue. Unless the KeepAlive option is true, there - /// will be no running task when the queue is empty. + /// Unless the AutoStart option is true, there will be no current threading task until + /// a background task is added to the queue, and there will be no current threading task + /// when the queue is empty. In which case this method returns null. + /// The returned value can be awaited and that is all (eg no continuation). /// - public ThreadingTaskImmutable CurrentThreadingTask + internal ThreadingTaskImmutable CurrentThreadingTask { get { lock (_locker) { - if (_runningTask == null) - throw new InvalidOperationException("There is no current Threading.Task."); - return new ThreadingTaskImmutable(_runningTask); + return _runningTask == null ? null : new ThreadingTaskImmutable(_runningTask); } } } + // fixme should the above throw, return null, a completed task? + + // fixme what's the diff?! /// /// Gets an awaitable used to await the runner running operation. /// /// An awaitable instance. /// Used to wait until the runner is no longer running (IsRunning == false), /// though the runner could be started again afterwards by adding tasks to it. - public ThreadingTaskImmutable StoppedAwaitable + internal ThreadingTaskImmutable StoppedAwaitable { get { @@ -192,20 +186,25 @@ namespace Umbraco.Web.Scheduling } /// - /// Gets an awaitable used to await the runner. + /// Gets an awaitable object that can be used to await for the runner to terminate. /// - /// An awaitable instance. - /// Used to wait until the runner is terminated. - public ThreadingTaskImmutable TerminatedAwaitable + /// An awaitable object. + /// + /// Used to wait until the runner has terminated. + /// This is for unit tests and should not be used otherwise. In most cases when the runner + /// has terminated, the application domain is going down and it is not the right time to do things. + /// + internal ThreadingTaskImmutable TerminatedAwaitable { get { lock (_locker) { - if (_terminatedSource == null && _terminated == false) + if (_terminatedSource == null) _terminatedSource = new TaskCompletionSource(); - var task = _terminatedSource == null ? Task.FromResult(0) : _terminatedSource.Task; - return new ThreadingTaskImmutable(task); + if (_terminated) + _terminatedSource.SetResult(0); + return new ThreadingTaskImmutable(_terminatedSource.Task); } } } @@ -219,7 +218,7 @@ namespace Umbraco.Web.Scheduling { lock (_locker) { - if (_isCompleted) + if (_completed) throw new InvalidOperationException("The task runner has completed."); // add task @@ -241,9 +240,9 @@ namespace Umbraco.Web.Scheduling { lock (_locker) { - if (_isCompleted) + if (_completed) { - _logger.Debug(_logPrefix + "Task cannot be added {0}, the task runner is already shutdown", task.GetType); + _logger.Debug(_logPrefix + "Task cannot be added {0}, the task runner has already shutdown", task.GetType); return false; } @@ -258,18 +257,33 @@ namespace Umbraco.Web.Scheduling } } + /// + /// Cancels to current task, if any. + /// + /// Has no effect if the task runs synchronously, or does not want to cancel. + public void CancelCurrentBackgroundTask() + { + lock (_locker) + { + if (_completed) + throw new InvalidOperationException("The task runner has completed."); + if (_cancelTokenSource != null) + _cancelTokenSource.Cancel(); + } + } + /// /// Starts the tasks runner, if not already running. /// /// Is invoked each time a task is added, to ensure it is going to be processed. /// The task runner has completed. - public void StartUp() + internal void StartUp() { if (_isRunning) return; lock (_locker) { - if (_isCompleted) + if (_completed) throw new InvalidOperationException("The task runner has completed."); StartUpLocked(); @@ -287,8 +301,12 @@ namespace Umbraco.Web.Scheduling _isRunning = true; // create a new token source since this is a new process - _tokenSource = new CancellationTokenSource(); - _runningTask = PumpIBackgroundTasks(Task.Factory, _tokenSource.Token); + _shutdownTokenSource = new CancellationTokenSource(); + _shutdownToken = _shutdownTokenSource.Token; + + _enumerator = _options.KeepAlive ? _tasks.GetConsumingEnumerable(_shutdownToken).GetEnumerator() : null; + _runningTask = Task.Run(async () => await Pump(), _shutdownToken); + _logger.Debug(_logPrefix + "Starting"); } @@ -303,7 +321,7 @@ namespace Umbraco.Web.Scheduling { lock (_locker) { - _isCompleted = true; // do not accept new tasks + _completed = true; // do not accept new tasks if (_isRunning == false) return; // done already } @@ -324,7 +342,7 @@ namespace Umbraco.Web.Scheduling // try to cancel running async tasks (cannot do much about sync tasks) // break delayed tasks delay // truncate running queues - _tokenSource.Cancel(false); // false is the default + _shutdownTokenSource.Cancel(false); // false is the default } // tasks in the queue will be executed... @@ -334,145 +352,104 @@ namespace Umbraco.Web.Scheduling _runningTask.Wait(); // wait for whatever is running to end... } - /// - /// Runs background tasks for as long as there are background tasks in the queue, with an asynchronous operation. - /// - /// The supporting . - /// A cancellation token. - /// The asynchronous operation. - private Task PumpIBackgroundTasks(TaskFactory factory, CancellationToken token) + private async Task Pump() { - var taskSource = new TaskCompletionSource(factory.CreationOptions); - var enumerator = _options.KeepAlive ? _tasks.GetConsumingEnumerable(token).GetEnumerator() : null; - - // ReSharper disable once MethodSupportsCancellation // always run - var taskSourceContinuing = taskSource.Task.ContinueWith(t => + while (true) { - // because the pump does not lock, there's a race condition, - // the pump may stop and then we still have tasks to process, - // and then we must restart the pump - lock to avoid race cond - var onStopped = false; + var bgTask = GetNextBackgroundTask(); + if (bgTask == null) + return; + lock (_locker) { - if (token.IsCancellationRequested || _tasks.Count == 0) - { - _logger.Debug(_logPrefix + "Stopping"); - - if (_options.PreserveRunningTask == false) - _runningTask = null; - - // stopped - _isRunning = false; - onStopped = true; - } + // set another one - for the next task + _cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_shutdownToken); } - if (onStopped) + bgTask = WaitForLatch(bgTask, _cancelTokenSource.Token); + if (bgTask == null) return; + + try { - OnEvent(Stopped, "Stopped"); - return; + await RunAsync(bgTask, _cancelTokenSource.Token).ConfigureAwait(false); } - - // if _runningTask is taskSource.Task then we must keep continuing it, - // not starting a new taskSource, else _runningTask would complete and - // something may be waiting on it - //PumpIBackgroundTasks(factory, token); // restart - // ReSharper disable MethodSupportsCancellation // always run - t.ContinueWithTask(_ => PumpIBackgroundTasks(factory, token)); // restart - // ReSharper restore MethodSupportsCancellation - }); - - Action pump = null; - pump = task => - { - // RunIBackgroundTaskAsync does NOT throw exceptions, just raises event - // so if we have an exception here, really, wtf? - must read the exception - // anyways so it does not bubble up and kill everything - if (task != null && task.IsFaulted) + catch (Exception e) { - var exception = task.Exception; - _logger.Error(_logPrefix + "Task runner exception.", exception); + // RunAsync should NOT throw exception - just raise an event + // this is here for safety and to ensure we don't kill everything, ever + _logger.Error(_logPrefix + "Task runner exception.", e); } - // is it ok to run? - if (TaskSourceCanceled(taskSource, token)) return; - - // try to get a task - // the blocking MoveNext will end if token is cancelled or collection is completed - T bgTask; - var hasBgTask = _options.KeepAlive - // ReSharper disable once PossibleNullReferenceException - ? (bgTask = enumerator.MoveNext() ? enumerator.Current : null) != null // blocking - : _tasks.TryTake(out bgTask); // non-blocking - - // no task, signal the runner we're done - if (hasBgTask == false) - { - TaskSourceCompleted(taskSource, token); - return; - } - - // wait for latched task, supporting cancellation - var dbgTask = bgTask as ILatchedBackgroundTask; - if (dbgTask != null && dbgTask.IsLatched) - { - WaitHandle.WaitAny(new[] { dbgTask.Latch, token.WaitHandle, _completedEvent.WaitHandle }); - if (TaskSourceCanceled(taskSource, token)) return; - // else run now, either because latch ok or runner is completed - // still latched & not running on shutdown = stop here - if (dbgTask.IsLatched && dbgTask.RunsOnShutdown == false) - { - dbgTask.Dispose(); // will not run - TaskSourceCompleted(taskSource, token); - return; - } - } - - // run the task as first task, or a continuation - task = task == null - ? RunIBackgroundTaskAsync(bgTask, token) - // ReSharper disable once MethodSupportsCancellation // always run - : task.ContinueWithTask(_ => RunIBackgroundTaskAsync(bgTask, token)); - - // and pump - // ReSharper disable once MethodSupportsCancellation // always run - task.ContinueWith(t => pump(t)); - }; - - // start it all - factory.StartNew(() => pump(null), - token, - _options.LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None, - TaskScheduler.Default); - - return taskSourceContinuing; - } - - private static bool TaskSourceCanceled(TaskCompletionSource taskSource, CancellationToken token) - { - if (token.IsCancellationRequested) - { - taskSource.SetCanceled(); - return true; + _cancelTokenSource = null; } - return false; } - private static void TaskSourceCompleted(TaskCompletionSource taskSource, CancellationToken token) + private T GetNextBackgroundTask() { - if (token.IsCancellationRequested) - taskSource.SetCanceled(); - else - taskSource.SetResult(null); + while (true) + { + // exit if cancelling + if (_shutdownToken.IsCancellationRequested == false) + { + // try to get a task + // the blocking MoveNext will end if token is cancelled or collection is completed + T bgTask; + var hasBgTask = _options.KeepAlive + ? (bgTask = _enumerator.MoveNext() ? _enumerator.Current : null) != null // blocking + : _tasks.TryTake(out bgTask); // non-blocking + + // exit if cancelling + if (_shutdownToken.IsCancellationRequested == false && hasBgTask) + return bgTask; + } + + lock (_locker) + { + if (_shutdownToken.IsCancellationRequested == false && _tasks.Count > 0) continue; + + _logger.Debug(_logPrefix + "Stopping"); + + if (_options.PreserveRunningTask == false) + _runningTask = null; + + _isRunning = false; + + _shutdownToken = CancellationToken.None; + _enumerator = null; + } + + OnEvent(Stopped, "Stopped"); + return null; + } } - /// - /// Runs a background task asynchronously. - /// - /// The background task. - /// A cancellation token. - /// The asynchronous operation. - internal async Task RunIBackgroundTaskAsync(T bgTask, CancellationToken token) + private T WaitForLatch(T bgTask, CancellationToken token) + { + var latched = bgTask as ILatchedBackgroundTask; + if (latched == null || latched.IsLatched == false) return bgTask; + + // returns the array index of the object that satisfied the wait + var i = WaitHandle.WaitAny(new[] { latched.Latch, token.WaitHandle, _completedEvent.WaitHandle }); + + switch (i) + { + case 0: + // ok to run now + return bgTask; + case 1: + // cancellation + return null; + case 2: + // termination + if (latched.RunsOnShutdown) return bgTask; + latched.Dispose(); + return null; + default: + throw new Exception("panic."); + } + } + + private async Task RunAsync(T bgTask, CancellationToken token) { try { @@ -488,7 +465,7 @@ namespace Umbraco.Web.Scheduling else bgTask.Run(); } - finally // ensure we disposed - unless latched (again) + finally // ensure we disposed - unless latched again ie wants to re-run { var lbgTask = bgTask as ILatchedBackgroundTask; if (lbgTask == null || lbgTask.IsLatched == false) @@ -506,11 +483,32 @@ namespace Umbraco.Web.Scheduling catch (Exception ex) { _logger.Error(_logPrefix + "Task has failed", ex); - } + } } #region Events + // triggers when a background task starts + public event TypedEventHandler, TaskEventArgs> TaskStarting; + + // triggers when a background task has completed + public event TypedEventHandler, TaskEventArgs> TaskCompleted; + + // triggers when a background task throws + public event TypedEventHandler, TaskEventArgs> TaskError; + + // triggers when a background task is cancelled + public event TypedEventHandler, TaskEventArgs> TaskCancelled; + + // triggers when the runner stops (but could start again if a task is added to it) + internal event TypedEventHandler, EventArgs> Stopped; + + // triggers when the hosting environment requests that the runner terminates + internal event TypedEventHandler, EventArgs> Terminating; + + // triggers when the runner has terminated (no task can be added, no task is running) + internal event TypedEventHandler, EventArgs> Terminated; + private void OnEvent(TypedEventHandler, EventArgs> handler, string name) { if (handler == null) return; @@ -632,9 +630,7 @@ namespace Umbraco.Web.Scheduling _logger.Info(_logPrefix + "Waiting for tasks to complete"); Shutdown(false, false); // do not accept any more tasks, flush the queue, do not wait - // raise the completed event only after the running task has completed - // and there's no more task running - + // raise the completed event only after the running threading task has completed lock (_locker) { if (_runningTask != null) @@ -656,6 +652,7 @@ namespace Umbraco.Web.Scheduling } } + // called by Stop either immediately or eventually private void Terminate(bool immediate) { // signal the environment we have terminated @@ -664,8 +661,6 @@ namespace Umbraco.Web.Scheduling // complete the awaitable completion source, if any HostingEnvironment.UnregisterObject(this); - _logger.Info(_logPrefix + "Tasks " + (immediate ? "cancelled" : "completed") + ", terminated"); - OnEvent(Terminated, "Terminated"); TaskCompletionSource terminatedSource; lock (_locker) @@ -673,6 +668,11 @@ namespace Umbraco.Web.Scheduling _terminated = true; terminatedSource = _terminatedSource; } + + _logger.Info(_logPrefix + "Tasks " + (immediate ? "cancelled" : "completed") + ", terminated"); + + OnEvent(Terminated, "Terminated"); + if (terminatedSource != null) terminatedSource.SetResult(0); }