refactor BackgroundTaskRunner

This commit is contained in:
Stephan
2015-02-06 16:10:34 +01:00
committed by Shannon
parent bc068b201d
commit b7436dc55f
11 changed files with 1168 additions and 380 deletions

View File

@@ -1,7 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
@@ -13,97 +13,272 @@ namespace Umbraco.Tests.Scheduling
[TestFixture]
public class BackgroundTaskRunnerTests
{
[Test]
public void Startup_And_Shutdown()
private static void AssertRunnerStopsRunning<T>(BackgroundTaskRunner<T> runner, int timeoutMilliseconds = 2000)
where T : class, IBackgroundTask
{
BackgroundTaskRunner<IBackgroundTask> tManager;
using (tManager = new BackgroundTaskRunner<IBackgroundTask>(true, true))
{
tManager.StartUp();
}
const int period = 200;
NUnit.Framework.Assert.IsFalse(tManager.IsRunning);
var i = 0;
var m = timeoutMilliseconds/period;
while (runner.IsRunning && i++ < m)
Thread.Sleep(period);
Assert.IsFalse(runner.IsRunning, "Runner is still running.");
}
[Test]
public void Startup_Starts_Automatically()
public void ShutdownWaitWhenRunning()
{
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(true, true))
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions { AutoStart = true, KeepAlive = true }))
{
tManager.Add(new MyTask());
NUnit.Framework.Assert.IsTrue(tManager.IsRunning);
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(800); // for long
Assert.IsTrue(runner.IsRunning);
runner.Shutdown(false, true); // -force +wait
AssertRunnerStopsRunning(runner);
Assert.IsTrue(runner.IsCompleted);
}
}
[Test]
public void Task_Runs()
public void ShutdownWhenRunning()
{
var myTask = new MyTask();
var waitHandle = new ManualResetEvent(false);
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(true, true))
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
tManager.TaskCompleted += (sender, task) => waitHandle.Set();
// do NOT try to do this because the code must run on the UI thread which
// is not availably, and so the thread never actually starts - wondering
// what it means for ASP.NET?
//runner.TaskStarting += (sender, args) => Console.WriteLine("starting {0:c}", DateTime.Now);
//runner.TaskCompleted += (sender, args) => Console.WriteLine("completed {0:c}", DateTime.Now);
tManager.Add(myTask);
//wait for ITasks to complete
waitHandle.WaitOne();
NUnit.Framework.Assert.IsTrue(myTask.Ended != default(DateTime));
Assert.IsFalse(runner.IsRunning);
runner.Add(new MyTask(5000));
Assert.IsTrue(runner.IsRunning); // is running the task
runner.Shutdown(false, false); // -force -wait
Assert.IsTrue(runner.IsCompleted);
Assert.IsTrue(runner.IsRunning); // still running that task
Thread.Sleep(3000);
Assert.IsTrue(runner.IsRunning); // still running that task
AssertRunnerStopsRunning(runner, 10000);
}
}
[Test]
public void Many_Tasks_Run()
public void ShutdownFlushesTheQueue()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
Assert.IsFalse(runner.IsRunning);
runner.Add(new MyTask(5000));
runner.Add(new MyTask());
var t = new MyTask();
runner.Add(t);
Assert.IsTrue(runner.IsRunning); // is running the first task
runner.Shutdown(false, false); // -force -wait
AssertRunnerStopsRunning(runner, 10000);
Assert.AreNotEqual(DateTime.MinValue, t.Ended); // t has run
}
}
[Test]
public void ShutdownForceTruncatesTheQueue()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
Assert.IsFalse(runner.IsRunning);
runner.Add(new MyTask(5000));
runner.Add(new MyTask());
var t = new MyTask();
runner.Add(t);
Assert.IsTrue(runner.IsRunning); // is running the first task
runner.Shutdown(true, false); // +force -wait
AssertRunnerStopsRunning(runner, 10000);
Assert.AreEqual(DateTime.MinValue, t.Ended); // t has not run
}
}
[Test]
public void ShutdownThenForce()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
Assert.IsFalse(runner.IsRunning);
runner.Add(new MyTask(5000));
runner.Add(new MyTask());
runner.Add(new MyTask());
Assert.IsTrue(runner.IsRunning); // is running the task
runner.Shutdown(false, false); // -force -wait
Assert.IsTrue(runner.IsCompleted);
Assert.IsTrue(runner.IsRunning); // still running that task
Thread.Sleep(3000);
Assert.IsTrue(runner.IsRunning); // still running that task
runner.Shutdown(true, false); // +force -wait
AssertRunnerStopsRunning(runner, 20000);
}
}
[Test]
public void Create_IsRunning()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
Assert.IsFalse(runner.IsRunning);
}
}
[Test]
public void Create_AutoStart_IsRunning()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions { AutoStart = true }))
{
Assert.IsTrue(runner.IsRunning);
AssertRunnerStopsRunning(runner); // though not for long
}
}
[Test]
public void Create_AutoStartAndKeepAlive_IsRunning()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions { AutoStart = true, KeepAlive = true }))
{
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(800); // for long
Assert.IsTrue(runner.IsRunning);
// dispose will stop it
}
}
[Test]
public void Dispose_IsRunning()
{
BackgroundTaskRunner<IBackgroundTask> runner;
using (runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions { AutoStart = true, KeepAlive = true }))
{
Assert.IsTrue(runner.IsRunning);
// dispose will stop it
}
AssertRunnerStopsRunning(runner);
Assert.Throws<InvalidOperationException>(() => runner.Add(new MyTask()));
}
[Test]
public void Startup_IsRunning()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
Assert.IsFalse(runner.IsRunning);
runner.StartUp();
Assert.IsTrue(runner.IsRunning);
AssertRunnerStopsRunning(runner); // though not for long
}
}
[Test]
public void Startup_KeepAlive_IsRunning()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions { KeepAlive = true }))
{
Assert.IsFalse(runner.IsRunning);
runner.StartUp();
Assert.IsTrue(runner.IsRunning);
// dispose will stop it
}
}
[Test]
public void Create_AddTask_IsRunning()
{
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions()))
{
runner.Add(new MyTask());
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(800); // task takes 500ms
Assert.IsFalse(runner.IsRunning);
}
}
[Test]
public void Create_KeepAliveAndAddTask_IsRunning()
{
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions { KeepAlive = true }))
{
runner.Add(new MyTask());
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(800); // task takes 500ms
Assert.IsTrue(runner.IsRunning);
// dispose will stop it
}
}
[Test]
public async void WaitOnRunner_OneTask()
{
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyTask();
Assert.IsTrue(task.Ended == default(DateTime));
runner.Add(task);
await runner; // wait 'til it's not running anymore
Assert.IsTrue(task.Ended != default(DateTime)); // task is done
AssertRunnerStopsRunning(runner); // though not for long
}
}
[Test]
public async void WaitOnRunner_Tasks()
{
var tasks = new List<BaseTask>();
for (var i = 0; i < 10; i++)
tasks.Add(new MyTask());
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions { KeepAlive = false, LongRunning = true, PreserveRunningTask = true }))
{
tasks.ForEach(runner.Add);
await runner; // wait 'til it's not running anymore
// check that tasks are done
Assert.IsTrue(tasks.All(x => x.Ended != default(DateTime)));
Assert.AreEqual(TaskStatus.RanToCompletion, runner.TaskStatus);
Assert.IsFalse(runner.IsRunning);
Assert.IsFalse(runner.IsDisposed);
}
}
[Test]
public void WaitOnTask()
{
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyTask();
var waitHandle = new ManualResetEvent(false);
runner.TaskCompleted += (sender, t) => waitHandle.Set();
Assert.IsTrue(task.Ended == default(DateTime));
runner.Add(task);
waitHandle.WaitOne(); // wait 'til task is done
Assert.IsTrue(task.Ended != default(DateTime)); // task is done
AssertRunnerStopsRunning(runner); // though not for long
}
}
[Test]
public void WaitOnTasks()
{
var tasks = new Dictionary<BaseTask, ManualResetEvent>();
for (var i = 0; i < 10; i++)
{
tasks.Add(new MyTask(), new ManualResetEvent(false));
}
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(true, true))
using (var runner = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions()))
{
tManager.TaskCompleted += (sender, task) => tasks[task.Task].Set();
runner.TaskCompleted += (sender, task) => tasks[task.Task].Set();
foreach (var t in tasks) runner.Add(t.Key);
tasks.ForEach(t => tManager.Add(t.Key));
//wait for all ITasks to complete
// wait 'til tasks are done, check that tasks are done
WaitHandle.WaitAll(tasks.Values.Select(x => (WaitHandle)x).ToArray());
Assert.IsTrue(tasks.All(x => x.Key.Ended != default(DateTime)));
foreach (var task in tasks)
{
NUnit.Framework.Assert.IsTrue(task.Key.Ended != default(DateTime));
}
}
}
[Test]
public async void Many_Tasks_Added_Only_Last_Task_Executes_With_Option()
{
var tasks = new Dictionary<BaseTask, ManualResetEvent>();
for (var i = 0; i < 10; i++)
{
tasks.Add(new MyTask(), new ManualResetEvent(false));
}
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions{OnlyProcessLastItem = true}))
{
tasks.ForEach(t => tManager.Add(t.Key));
//wait till the thread is done
await tManager;
var countExecuted = tasks.Count(x => x.Key.Ended != default(DateTime));
Assert.AreEqual(1, countExecuted);
AssertRunnerStopsRunning(runner); // though not for long
}
}
@@ -123,7 +298,7 @@ namespace Umbraco.Tests.Scheduling
IDictionary<BaseTask, ManualResetEvent> tasks = getTasks();
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(true, true))
using (tManager = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions { LongRunning = true, KeepAlive = true }))
{
tManager.TaskCompleted += (sender, task) => tasks[task.Task].Set();
@@ -135,7 +310,7 @@ namespace Umbraco.Tests.Scheduling
foreach (var task in tasks)
{
NUnit.Framework.Assert.IsTrue(task.Key.Ended != default(DateTime));
Assert.IsTrue(task.Key.Ended != default(DateTime));
}
//execute another batch after a bit
@@ -149,71 +324,11 @@ namespace Umbraco.Tests.Scheduling
foreach (var task in tasks)
{
NUnit.Framework.Assert.IsTrue(task.Key.Ended != default(DateTime));
Assert.IsTrue(task.Key.Ended != default(DateTime));
}
}
}
[Test]
public void Task_Queue_Will_Be_Completed_Before_Shutdown()
{
var tasks = new Dictionary<BaseTask, ManualResetEvent>();
for (var i = 0; i < 10; i++)
{
tasks.Add(new MyTask(), new ManualResetEvent(false));
}
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(true, true))
{
tManager.TaskCompleted += (sender, task) => tasks[task.Task].Set();
tasks.ForEach(t => tManager.Add(t.Key));
////wait for all ITasks to complete
//WaitHandle.WaitAll(tasks.Values.Select(x => (WaitHandle)x).ToArray());
tManager.Stop(false);
//immediate stop will block until complete - but since we are running on
// a single thread this doesn't really matter as the above will just process
// until complete.
tManager.Stop(true);
NUnit.Framework.Assert.AreEqual(0, tManager.TaskCount);
}
}
//NOTE: These tests work in .Net 4.5 but in this current version we don't have the correct
// async/await signatures with GetAwaiter, so am just commenting these out in this version
[Test]
public async void Non_Persistent_Runner_Will_End_After_Queue_Empty()
{
var tasks = new List<BaseTask>();
for (var i = 0; i < 10; i++)
{
tasks.Add(new MyTask());
}
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(persistentThread: false, dedicatedThread:true))
{
tasks.ForEach(t => tManager.Add(t));
//wait till the thread is done
await tManager;
foreach (var task in tasks)
{
Assert.IsTrue(task.Ended != default(DateTime));
}
Assert.AreEqual(TaskStatus.RanToCompletion, tManager.TaskStatus);
Assert.IsFalse(tManager.IsRunning);
Assert.IsFalse(tManager.IsDisposed);
}
}
[Test]
public async void Non_Persistent_Runner_Will_Start_New_Threads_When_Required()
{
@@ -229,10 +344,9 @@ namespace Umbraco.Tests.Scheduling
List<BaseTask> tasks = getTasks();
BackgroundTaskRunner<BaseTask> tManager;
using (tManager = new BackgroundTaskRunner<BaseTask>(persistentThread: false, dedicatedThread: true))
using (var tManager = new BackgroundTaskRunner<BaseTask>(new BackgroundTaskRunnerOptions { LongRunning = true, PreserveRunningTask = true }))
{
tasks.ForEach(t => tManager.Add(t));
tasks.ForEach(tManager.Add);
//wait till the thread is done
await tManager;
@@ -250,7 +364,7 @@ namespace Umbraco.Tests.Scheduling
tasks = getTasks();
//add more tasks
tasks.ForEach(t => tManager.Add(t));
tasks.ForEach(tManager.Add);
//wait till the thread is done
await tManager;
@@ -265,19 +379,296 @@ namespace Umbraco.Tests.Scheduling
Assert.IsFalse(tManager.IsDisposed);
}
}
private class MyTask : BaseTask
[Test]
public void RecurringTaskTest()
{
public MyTask()
// note: can have BackgroundTaskRunner<IBackgroundTask> and use it in MyRecurringTask ctor
// because that ctor wants IBackgroundTaskRunner<MyRecurringTask> and the generic type
// parameter is contravariant ie defined as IBackgroundTaskRunner<in T> so doing the
// following is legal:
// var IBackgroundTaskRunner<Base> b = ...;
// var IBackgroundTaskRunner<Derived> d = b; // legal
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyRecurringTask(runner, 200, 500);
MyRecurringTask.RunCount = 0;
runner.Add(task);
Thread.Sleep(5000);
Assert.GreaterOrEqual(MyRecurringTask.RunCount, 2); // keeps running, count >= 2
// stops recurring
runner.Shutdown(false, false);
AssertRunnerStopsRunning(runner);
// timer may try to add a task but it won't work because runner is completed
}
}
[Test]
public void DelayedTaskRuns()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyDelayedTask(200);
runner.Add(task);
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(5000);
Assert.IsTrue(runner.IsRunning); // still waiting for the task to release
Assert.IsFalse(task.HasRun);
task.Release();
Thread.Sleep(500);
Assert.IsTrue(task.HasRun);
AssertRunnerStopsRunning(runner); // runs task & exit
}
}
[Test]
public void DelayedTaskStops()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyDelayedTask(200);
runner.Add(task);
Assert.IsTrue(runner.IsRunning);
Thread.Sleep(5000);
Assert.IsTrue(runner.IsRunning); // still waiting for the task to release
Assert.IsFalse(task.HasRun);
runner.Shutdown(false, false);
AssertRunnerStopsRunning(runner); // runs task & exit
Assert.IsTrue(task.HasRun);
}
}
[Test]
public void DelayedRecurring()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var task = new MyDelayedRecurringTask(runner, 2000, 1000);
MyDelayedRecurringTask.RunCount = 0;
runner.Add(task);
Thread.Sleep(1000);
Assert.IsTrue(runner.IsRunning); // waiting on delay
Assert.AreEqual(0, MyDelayedRecurringTask.RunCount);
Thread.Sleep(1000);
Assert.AreEqual(1, MyDelayedRecurringTask.RunCount);
Thread.Sleep(5000);
Assert.GreaterOrEqual(MyDelayedRecurringTask.RunCount, 2); // keeps running, count >= 2
// stops recurring
runner.Shutdown(false, false);
AssertRunnerStopsRunning(runner);
// timer may try to add a task but it won't work because runner is completed
}
}
[Test]
public void FailingTaskSync()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var exceptions = new ConcurrentQueue<Exception>();
runner.TaskError += (sender, args) => exceptions.Enqueue(args.Exception);
var task = new MyFailingTask(false); // -async
runner.Add(task);
Assert.IsTrue(runner.IsRunning);
AssertRunnerStopsRunning(runner); // runs task & exit
Assert.AreEqual(1, exceptions.Count); // traced and reported
}
}
[Test]
public void FailingTaskAsync()
{
using (var runner = new BackgroundTaskRunner<IBackgroundTask>(new BackgroundTaskRunnerOptions()))
{
var exceptions = new ConcurrentQueue<Exception>();
runner.TaskError += (sender, args) => exceptions.Enqueue(args.Exception);
var task = new MyFailingTask(true); // +async
runner.Add(task);
Assert.IsTrue(runner.IsRunning);
AssertRunnerStopsRunning(runner); // runs task & exit
Assert.AreEqual(1, exceptions.Count); // traced and reported
}
}
private class MyFailingTask : IBackgroundTask
{
private readonly bool _isAsync;
public MyFailingTask(bool isAsync)
{
_isAsync = isAsync;
}
public void Run()
{
Thread.Sleep(1000);
throw new Exception("Task has thrown.");
}
public async Task RunAsync()
{
await Task.Delay(1000);
throw new Exception("Task has thrown.");
}
public bool IsAsync
{
get { return _isAsync; }
}
// fixme - must also test what happens if we throw on dispose!
public void Dispose()
{ }
}
private class MyDelayedRecurringTask : DelayedRecurringTaskBase<MyDelayedRecurringTask>
{
public MyDelayedRecurringTask(IBackgroundTaskRunner<MyDelayedRecurringTask> runner, int delayMilliseconds, int periodMilliseconds)
: base(runner, delayMilliseconds, periodMilliseconds)
{ }
private MyDelayedRecurringTask(MyDelayedRecurringTask source)
: base(source)
{ }
public static int RunCount { get; set; }
public override bool IsAsync
{
get { return false; }
}
public override void PerformRun()
{
Thread.Sleep(500);
// nothing to do at the moment
RunCount += 1;
}
public override Task PerformRunAsync()
{
throw new NotImplementedException();
}
protected override MyDelayedRecurringTask GetRecurring()
{
return new MyDelayedRecurringTask(this);
}
}
private class MyDelayedTask : IDelayedBackgroundTask
{
private readonly int _runMilliseconds;
private readonly ManualResetEvent _gate;
public bool HasRun { get; private set; }
public MyDelayedTask(int runMilliseconds)
{
_runMilliseconds = runMilliseconds;
_gate = new ManualResetEvent(false);
}
public WaitHandle DelayWaitHandle
{
get { return _gate; }
}
public bool IsDelayed
{
get { return true; }
}
public void Run()
{
Thread.Sleep(_runMilliseconds);
HasRun = true;
}
public void Release()
{
_gate.Set();
}
public Task RunAsync()
{
throw new NotImplementedException();
}
public bool IsAsync
{
get { return false; }
}
public void Dispose()
{ }
}
private class MyRecurringTask : RecurringTaskBase<MyRecurringTask>
{
private readonly int _runMilliseconds;
public static int RunCount { get; set; }
public MyRecurringTask(IBackgroundTaskRunner<MyRecurringTask> runner, int runMilliseconds, int periodMilliseconds)
: base(runner, periodMilliseconds)
{
_runMilliseconds = runMilliseconds;
}
private MyRecurringTask(MyRecurringTask source, int runMilliseconds)
: base(source)
{
_runMilliseconds = runMilliseconds;
}
public override void PerformRun()
{
RunCount += 1;
Thread.Sleep(_runMilliseconds);
}
public override Task PerformRunAsync()
{
throw new NotImplementedException();
}
public override bool IsAsync
{
get { return false; }
}
protected override MyRecurringTask GetRecurring()
{
return new MyRecurringTask(this, _runMilliseconds);
}
}
private class MyTask : BaseTask
{
private readonly int _milliseconds;
public MyTask()
: this(500)
{ }
public MyTask(int milliseconds)
{
_milliseconds = milliseconds;
}
public override void PerformRun()
{
Thread.Sleep(_milliseconds);
}
}
public abstract class BaseTask : IBackgroundTask
@@ -287,14 +678,17 @@ namespace Umbraco.Tests.Scheduling
public Guid UniqueId { get; protected set; }
public abstract void PerformRun();
public void Run()
{
PerformRun();
Ended = DateTime.Now;
}
public Task RunAsync()
{
throw new NotImplementedException();
//return Task.Delay(500); // fixme
}
public bool IsAsync
@@ -312,10 +706,7 @@ namespace Umbraco.Tests.Scheduling
public DateTime Ended { get; set; }
public virtual void Dispose()
{
}
{ }
}
}
}

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -9,60 +10,97 @@ using Umbraco.Core.Logging;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// This is used to create a background task runner which will stay alive in the background of and complete
/// any tasks that are queued. It is web aware and will ensure that it is shutdown correctly when the app domain
/// is shutdown.
/// Manages a queue of tasks of type <typeparamref name="T"/> and runs them in the background.
/// </summary>
/// <typeparam name="T"></typeparam>
internal class BackgroundTaskRunner<T> : IDisposable, IRegisteredObject
where T : IBackgroundTask
/// <typeparam name="T">The type of the managed tasks.</typeparam>
/// <remarks>The task runner is web-aware and will ensure that it shuts down correctly when the AppDomain
/// shuts down (ie is unloaded). FIXME WHAT DOES THAT MEAN?</remarks>
internal class BackgroundTaskRunner<T> : IBackgroundTaskRunner<T>
where T : class, IBackgroundTask
{
private readonly BackgroundTaskRunnerOptions _options;
private readonly BlockingCollection<T> _tasks = new BlockingCollection<T>();
private Task _consumer;
private readonly object _locker = new object();
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
private volatile bool _isRunning; // is running
private volatile bool _isCompleted; // does not accept tasks anymore, may still be running
private Task _runningTask;
private volatile bool _isRunning = false;
private static readonly object Locker = new object();
private CancellationTokenSource _tokenSource;
internal event EventHandler<TaskEventArgs<T>> TaskError;
internal event EventHandler<TaskEventArgs<T>> TaskStarting;
internal event EventHandler<TaskEventArgs<T>> TaskCompleted;
internal event EventHandler<TaskEventArgs<T>> TaskCancelled;
public BackgroundTaskRunner(bool dedicatedThread = false, bool persistentThread = false)
: this(new BackgroundTaskRunnerOptions{DedicatedThread = dedicatedThread, PersistentThread = persistentThread})
{
}
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundTaskRunner{T}"/> class.
/// </summary>
public BackgroundTaskRunner()
: this(new BackgroundTaskRunnerOptions())
{ }
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundTaskRunner{T}"/> class with a set of options.
/// </summary>
/// <param name="options">The set of options.</param>
public BackgroundTaskRunner(BackgroundTaskRunnerOptions options)
{
if (options == null) throw new ArgumentNullException("options");
_options = options;
HostingEnvironment.RegisterObject(this);
if (options.AutoStart)
StartUp();
}
/// <summary>
/// Gets the number of tasks in the queue.
/// </summary>
public int TaskCount
{
get { return _tasks.Count; }
}
/// <summary>
/// Gets a value indicating whether a task is currently running.
/// </summary>
public bool IsRunning
{
get { return _isRunning; }
}
public TaskStatus TaskStatus
/// <summary>
/// Gets a value indicating whether the runner has completed and cannot accept tasks anymore.
/// </summary>
public bool IsCompleted
{
get { return _consumer.Status; }
get { return _isCompleted; }
}
/// <summary>
/// Gets the status of the running task.
/// </summary>
/// <exception cref="InvalidOperationException">There is no running task.</exception>
/// <remarks>Unless the AutoStart option is true, there will be no running task until
/// a background task is added to the queue. Unless the KeepAlive option is true, there
/// will be no running task when the queue is empty.</remarks>
public TaskStatus TaskStatus
{
get
{
if (_runningTask == null)
throw new InvalidOperationException("There is no current task.");
return _runningTask.Status;
}
}
/// <summary>
/// Returns the task awaiter so that consumers of the BackgroundTaskManager can await
/// the threading operation.
/// Gets an awaiter used to await the running task.
/// </summary>
/// <returns></returns>
/// <returns>An awaiter for the running task.</returns>
/// <remarks>
/// This is just the coolest thing ever, check this article out:
/// http://blogs.msdn.com/b/pfxteam/archive/2011/01/13/10115642.aspx
@@ -70,267 +108,287 @@ namespace Umbraco.Web.Scheduling
/// So long as we have a method called GetAwaiter() that returns an instance of INotifyCompletion
/// we can await anything! :)
/// </remarks>
/// <exception cref="InvalidOperationException">There is no running task.</exception>
/// <remarks>Unless the AutoStart option is true, there will be no running task until
/// a background task is added to the queue. Unless the KeepAlive option is true, there
/// will be no running task when the queue is empty.</remarks>
public TaskAwaiter GetAwaiter()
{
return _consumer.GetAwaiter();
if (_runningTask == null)
throw new InvalidOperationException("There is no current task.");
return _runningTask.GetAwaiter();
}
/// <summary>
/// Adds a task to the queue.
/// </summary>
/// <param name="task">The task to add.</param>
/// <exception cref="InvalidOperationException">The task runner has completed.</exception>
public void Add(T task)
{
//add any tasks first
LogHelper.Debug<BackgroundTaskRunner<T>>(" Task added {0}", () => task.GetType());
_tasks.Add(task);
lock (_locker)
{
if (_isCompleted)
throw new InvalidOperationException("The task runner has completed.");
//ensure's everything is started
StartUp();
// add task
LogHelper.Debug<BackgroundTaskRunner<T>>("Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
}
}
/// <summary>
/// Tries to add a task to the queue.
/// </summary>
/// <param name="task">The task to add.</param>
/// <returns>true if the task could be added to the queue; otherwise false.</returns>
/// <remarks>Returns false if the runner is completed.</remarks>
public bool TryAdd(T task)
{
lock (_locker)
{
if (_isCompleted) return false;
// add task
LogHelper.Debug<BackgroundTaskRunner<T>>("Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
return true;
}
}
/// <summary>
/// Starts the tasks runner, if not already running.
/// </summary>
/// <remarks>Is invoked each time a task is added, to ensure it is going to be processed.</remarks>
/// <exception cref="InvalidOperationException">The task runner has completed.</exception>
public void StartUp()
{
if (!_isRunning)
if (_isRunning) return;
lock (_locker)
{
lock (Locker)
{
//double check
if (!_isRunning)
{
_isRunning = true;
//Create a new token source since this is a new proces
_tokenSource = new CancellationTokenSource();
StartConsumer();
LogHelper.Debug<BackgroundTaskRunner<T>>("Starting");
}
}
}
}
if (_isCompleted)
throw new InvalidOperationException("The task runner has completed.");
public void ShutDown()
{
lock (Locker)
{
_isRunning = false;
try
{
if (_consumer != null)
{
//cancel all operations
_tokenSource.Cancel();
try
{
_consumer.Wait();
}
catch (AggregateException e)
{
//NOTE: We are logging Debug because we are expecting these errors
LogHelper.Debug<BackgroundTaskRunner<T>>("AggregateException thrown with the following inner exceptions:");
// Display information about each exception.
foreach (var v in e.InnerExceptions)
{
var exception = v as TaskCanceledException;
if (exception != null)
{
LogHelper.Debug<BackgroundTaskRunner<T>>(" .Net TaskCanceledException: .Net Task ID {0}", () => exception.Task.Id);
}
else
{
LogHelper.Debug<BackgroundTaskRunner<T>>(" Exception: {0}", () => v.GetType().Name);
}
}
}
}
if (_tasks.Count > 0)
{
LogHelper.Debug<BackgroundTaskRunner<T>>("Processing remaining tasks before shutdown: {0}", () => _tasks.Count);
//now we need to ensure the remaining queue is processed if there's any remaining,
// this will all be processed on the current/main thread.
T remainingTask;
while (_tasks.TryTake(out remainingTask))
{
//skip if this is not the last
if (_options.OnlyProcessLastItem && _tasks.Count > 0)
{
//NOTE: don't raise canceled event, we're shutting down, just dispose
remainingTask.Dispose();
continue;
}
ConsumeTaskInternalAsync(remainingTask)
.Wait(); //block until it completes
}
}
LogHelper.Debug<BackgroundTaskRunner<T>>("Shutdown");
//disposing these is really optional since they'll be disposed immediately since they are no longer running
//but we'll put this here anyways.
if (_consumer != null && (_consumer.IsCompleted || _consumer.IsCanceled))
{
_consumer.Dispose();
}
}
catch (Exception ex)
{
LogHelper.Error<BackgroundTaskRunner<T>>("Error occurred shutting down task runner", ex);
}
finally
{
HostingEnvironment.UnregisterObject(this);
}
StartUpLocked();
}
}
/// <summary>
/// Starts the consumer task
/// Starts the tasks runner, if not already running.
/// </summary>
private void StartConsumer()
/// <remarks>Must be invoked within lock(_locker) and with _isCompleted being false.</remarks>
private void StartUpLocked()
{
var token = _tokenSource.Token;
_consumer = Task.Factory.StartNew(() =>
StartThreadAsync(token),
token,
_options.DedicatedThread ? TaskCreationOptions.LongRunning : TaskCreationOptions.None,
TaskScheduler.Default);
//if this is not a persistent thread, wait till it's done and shut ourselves down
// thus ending the thread or giving back to the thread pool. If another task is added
// another thread will spawn or be taken from the pool to process.
if (!_options.PersistentThread)
{
_consumer.ContinueWith(task => ShutDown());
}
// double check
if (_isRunning) return;
_isRunning = true;
// create a new token source since this is a new process
_tokenSource = new CancellationTokenSource();
_runningTask = PumpIBackgroundTasks(Task.Factory, _tokenSource.Token);
LogHelper.Debug<BackgroundTaskRunner<T>>("Starting");
}
/// <summary>
/// Invokes a new worker thread to consume tasks
/// Shuts the taks runner down.
/// </summary>
/// <param name="token"></param>
private async Task StartThreadAsync(CancellationToken token)
/// <param name="force">True for force the runner to stop.</param>
/// <param name="wait">True to wait until the runner has stopped.</param>
/// <remarks>If <paramref name="force"/> is false, no more tasks can be queued but all queued tasks
/// will run. If it is true, then only the current one (if any) will end and no other task will run.</remarks>
public void Shutdown(bool force, bool wait)
{
// Was cancellation already requested?
if (token.IsCancellationRequested)
lock (_locker)
{
LogHelper.Info<BackgroundTaskRunner<T>>("Thread {0} was cancelled before it got started.", () => Thread.CurrentThread.ManagedThreadId);
token.ThrowIfCancellationRequested();
_isCompleted = true; // do not accept new tasks
if (_isRunning == false) return; // done already
}
await TakeAndConsumeTaskAsync(token);
}
// try to be nice
// assuming multiple threads can do these without problems
_completedEvent.Set();
_tasks.CompleteAdding();
/// <summary>
/// Trys to get a task from the queue, if there isn't one it will wait a second and try again
/// </summary>
/// <param name="token"></param>
private async Task TakeAndConsumeTaskAsync(CancellationToken token)
{
if (token.IsCancellationRequested)
if (force)
{
LogHelper.Info<BackgroundTaskRunner<T>>("Thread {0} was cancelled.", () => Thread.CurrentThread.ManagedThreadId);
token.ThrowIfCancellationRequested();
}
//If this is true, the thread will stay alive and just wait until there is anything in the queue
// and process it. When there is nothing in the queue, the thread will just block until there is
// something to process.
//When this is false, the thread will process what is currently in the queue and once that is
// done, the thread will end and we will shutdown the process
if (_options.PersistentThread)
{
//This will iterate over the collection, if there is nothing to take
// the thread will block until there is something available.
//We need to pass our cancellation token so that the thread will
// cancel when we shutdown
foreach (var t in _tasks.GetConsumingEnumerable(token))
// we must bring everything down, now
Thread.Sleep(100); // give time to CompleAdding()
lock (_locker)
{
//skip if this is not the last
if (_options.OnlyProcessLastItem && _tasks.Count > 0)
{
OnTaskCancelled(new TaskEventArgs<T>(t));
continue;
}
await ConsumeTaskCancellableAsync(t, token);
// was CompleteAdding() enough?
if (_isRunning == false) return;
}
//recurse and keep going
await TakeAndConsumeTaskAsync(token);
// try to cancel running async tasks (cannot do much about sync tasks)
// break delayed tasks delay
// truncate running queues
_tokenSource.Cancel(false); // false is the default
}
else
{
T t;
while (_tasks.TryTake(out t))
{
//skip if this is not the last
if (_options.OnlyProcessLastItem && _tasks.Count > 0)
{
OnTaskCancelled(new TaskEventArgs<T>(t));
continue;
}
await ConsumeTaskCancellableAsync(t, token);
}
//the task will end here
// tasks in the queue will be executed...
if (wait == false) return;
_runningTask.Wait(); // wait for whatever is running to end...
}
}
internal async Task ConsumeTaskCancellableAsync(T task, CancellationToken token)
/// <summary>
/// Runs background tasks for as long as there are background tasks in the queue, with an asynchronous operation.
/// </summary>
/// <param name="factory">The supporting <see cref="TaskFactory"/>.</param>
/// <param name="token">A cancellation token.</param>
/// <returns>The asynchronous operation.</returns>
private Task PumpIBackgroundTasks(TaskFactory factory, CancellationToken token)
{
var taskSource = new TaskCompletionSource<object>(factory.CreationOptions);
var enumerator = _options.KeepAlive ? _tasks.GetConsumingEnumerable(token).GetEnumerator() : null;
// ReSharper disable once MethodSupportsCancellation // always run
var taskSourceContinuing = taskSource.Task.ContinueWith(t =>
{
// because the pump does not lock, there's a race condition,
// the pump may stop and then we still have tasks to process,
// and then we must restart the pump - lock to avoid race cond
lock (_locker)
{
if (token.IsCancellationRequested || _tasks.Count == 0)
{
_isRunning = false; // done
if (_options.PreserveRunningTask == false)
_runningTask = null;
return;
}
}
// if _runningTask is taskSource.Task then we must keep continuing it,
// not starting a new taskSource, else _runningTask would complete and
// something may be waiting on it
//PumpIBackgroundTasks(factory, token); // restart
// ReSharper disable once MethodSupportsCancellation // always run
t.ContinueWithTask(_ => PumpIBackgroundTasks(factory, token)); // restart
});
Action<Task> pump = null;
pump = task =>
{
// RunIBackgroundTaskAsync does NOT throw exceptions, just raises event
// so if we have an exception here, really, wtf? - must read the exception
// anyways so it does not bubble up and kill everything
if (task != null && task.IsFaulted)
{
var exception = task.Exception;
LogHelper.Error<BackgroundTaskRunner<T>>("Task runner exception.", exception);
}
// is it ok to run?
if (TaskSourceCanceled(taskSource, token)) return;
// try to get a task
// the blocking MoveNext will end if token is cancelled or collection is completed
T bgTask;
var hasBgTask = _options.KeepAlive
? (bgTask = enumerator.MoveNext() ? enumerator.Current : null) != null // blocking
: _tasks.TryTake(out bgTask); // non-blocking
// no task, signal the runner we're done
if (hasBgTask == false)
{
TaskSourceCompleted(taskSource, token);
return;
}
// wait for delayed task, supporting cancellation
var dbgTask = bgTask as IDelayedBackgroundTask;
if (dbgTask != null && dbgTask.IsDelayed)
{
WaitHandle.WaitAny(new[] { dbgTask.DelayWaitHandle, token.WaitHandle, _completedEvent });
if (TaskSourceCanceled(taskSource, token)) return;
// else run now, either because delay is ok or runner is completed
}
// run the task as first task, or a continuation
task = task == null
? RunIBackgroundTaskAsync(bgTask, token)
// ReSharper disable once MethodSupportsCancellation // always run
: task.ContinueWithTask(_ => RunIBackgroundTaskAsync(bgTask, token));
// and pump
// ReSharper disable once MethodSupportsCancellation // always run
task.ContinueWith(t => pump(t));
};
// start it all
factory.StartNew(() => pump(null),
token,
_options.LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None,
TaskScheduler.Default);
return taskSourceContinuing;
}
private bool TaskSourceCanceled(TaskCompletionSource<object> taskSource, CancellationToken token)
{
if (token.IsCancellationRequested)
{
OnTaskCancelled(new TaskEventArgs<T>(task));
//NOTE: Since the task hasn't started this is pretty pointless so leaving it out.
LogHelper.Info<BackgroundTaskRunner<T>>("Task {0}) was cancelled.",
() => task.GetType());
token.ThrowIfCancellationRequested();
taskSource.SetCanceled();
return true;
}
await ConsumeTaskInternalAsync(task);
return false;
}
private async Task ConsumeTaskInternalAsync(T task)
private void TaskSourceCompleted(TaskCompletionSource<object> taskSource, CancellationToken token)
{
if (token.IsCancellationRequested)
taskSource.SetCanceled();
else
taskSource.SetResult(null);
}
/// <summary>
/// Runs a background task asynchronously.
/// </summary>
/// <param name="bgTask">The background task.</param>
/// <param name="token">A cancellation token.</param>
/// <returns>The asynchronous operation.</returns>
internal async Task RunIBackgroundTaskAsync(T bgTask, CancellationToken token)
{
try
{
OnTaskStarting(new TaskEventArgs<T>(task));
OnTaskStarting(new TaskEventArgs<T>(bgTask));
try
{
using (task)
using (bgTask) // ensure it's disposed
{
if (task.IsAsync)
{
await task.RunAsync();
}
if (bgTask.IsAsync)
await bgTask.RunAsync(); // fixme should pass the token along?!
else
{
task.Run();
}
bgTask.Run();
}
}
catch (Exception e)
{
OnTaskError(new TaskEventArgs<T>(task, e));
OnTaskError(new TaskEventArgs<T>(bgTask, e));
throw;
}
OnTaskCompleted(new TaskEventArgs<T>(task));
Console.WriteLine("!1");
OnTaskCompleted(new TaskEventArgs<T>(bgTask));
}
catch (Exception ex)
{
LogHelper.Error<BackgroundTaskRunner<T>>("An error occurred consuming task", ex);
LogHelper.Error<BackgroundTaskRunner<T>>("Task has failed.", ex);
}
}
#region Events
protected virtual void OnTaskError(TaskEventArgs<T> e)
{
var handler = TaskError;
@@ -358,8 +416,10 @@ namespace Umbraco.Web.Scheduling
e.Task.Dispose();
}
#endregion
#region IDisposable
#region Disposal
private readonly object _disposalLocker = new object();
public bool IsDisposed { get; private set; }
@@ -376,8 +436,9 @@ namespace Umbraco.Web.Scheduling
protected virtual void Dispose(bool disposing)
{
if (this.IsDisposed || !disposing)
if (this.IsDisposed || disposing == false)
return;
lock (_disposalLocker)
{
if (IsDisposed)
@@ -389,31 +450,60 @@ namespace Umbraco.Web.Scheduling
protected virtual void DisposeResources()
{
ShutDown();
// just make sure we eventually go down
Shutdown(true, false);
}
#endregion
/// <summary>
/// Requests a registered object to unregister.
/// </summary>
/// <param name="immediate">true to indicate the registered object should unregister from the hosting
/// environment before returning; otherwise, false.</param>
/// <remarks>
/// <para>"When the application manager needs to stop a registered object, it will call the Stop method."</para>
/// <para>The application manager will call the Stop method to ask a registered object to unregister. During
/// processing of the Stop method, the registered object must call the HostingEnvironment.UnregisterObject method.</para>
/// </remarks>
public void Stop(bool immediate)
{
if (immediate == false)
{
LogHelper.Debug<BackgroundTaskRunner<T>>("Application is shutting down, waiting for tasks to complete");
Dispose();
// The Stop method is first called with the immediate parameter set to false. The object can either complete
// processing, call the UnregisterObject method, and then return or it can return immediately and complete
// processing asynchronously before calling the UnregisterObject method.
LogHelper.Debug<BackgroundTaskRunner<T>>("Shutting down, waiting for tasks to complete.");
Shutdown(false, false); // do not accept any more tasks, flush the queue, do not wait
lock (_locker)
{
if (_runningTask != null)
_runningTask.ContinueWith(_ =>
{
HostingEnvironment.UnregisterObject(this);
LogHelper.Info<BackgroundTaskRunner<T>>("Down, tasks completed.");
});
else
{
HostingEnvironment.UnregisterObject(this);
LogHelper.Info<BackgroundTaskRunner<T>>("Down, tasks completed.");
}
}
}
else
{
//NOTE: this will thread block the current operation if the manager
// is still shutting down because the Shutdown operation is also locked
// by this same lock instance. This would only matter if Stop is called by ASP.Net
// on two different threads though, otherwise the current thread will just block normally
// until the app is shutdown
lock (Locker)
{
LogHelper.Info<BackgroundTaskRunner<T>>("Application is shutting down immediately");
}
// If the registered object does not complete processing before the application manager's time-out
// period expires, the Stop method is called again with the immediate parameter set to true. When the
// immediate parameter is true, the registered object must call the UnregisterObject method before returning;
// otherwise, its registration will be removed by the application manager.
LogHelper.Info<BackgroundTaskRunner<T>>("Shutting down immediately.");
Shutdown(true, true); // cancel all tasks, wait for the current one to end
HostingEnvironment.UnregisterObject(this);
LogHelper.Info<BackgroundTaskRunner<T>>("Down.");
}
}
}
}

View File

@@ -1,23 +1,44 @@
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Provides options to the <see cref="BackgroundTaskRunner{T}"/> class.
/// </summary>
internal class BackgroundTaskRunnerOptions
{
//TODO: Could add options for using a stack vs queue if required
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundTaskRunnerOptions"/> class.
/// </summary>
public BackgroundTaskRunnerOptions()
{
DedicatedThread = false;
PersistentThread = false;
OnlyProcessLastItem = false;
LongRunning = false;
KeepAlive = false;
AutoStart = false;
}
public bool DedicatedThread { get; set; }
public bool PersistentThread { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the running task should be a long-running,
/// coarse grained operation.
/// </summary>
public bool LongRunning { get; set; }
/// <summary>
/// If this is true, the task runner will skip over all items and only process the last/final
/// item registered
/// Gets or sets a value indicating whether the running task should block and wait
/// on the queue, or end, when the queue is empty.
/// </summary>
public bool OnlyProcessLastItem { get; set; }
public bool KeepAlive { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the running task should start immediately
/// or only once a task has been added to the queue.
/// </summary>
public bool AutoStart { get; set; }
/// <summary>
/// Gets or setes a value indicating whether the running task should be preserved
/// once completed, or reset to null. For unit tests.
/// </summary>
public bool PreserveRunningTask { get; set; }
}
}

View File

@@ -0,0 +1,52 @@
using System;
using System.Threading;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Provides a base class for recurring background tasks.
/// </summary>
/// <typeparam name="T">The type of the managed tasks.</typeparam>
internal abstract class DelayedRecurringTaskBase<T> : RecurringTaskBase<T>, IDelayedBackgroundTask
where T : class, IBackgroundTask
{
private readonly int _delayMilliseconds;
private ManualResetEvent _gate;
private Timer _timer;
protected DelayedRecurringTaskBase(IBackgroundTaskRunner<T> runner, int delayMilliseconds, int periodMilliseconds)
: base(runner, periodMilliseconds)
{
_delayMilliseconds = delayMilliseconds;
}
protected DelayedRecurringTaskBase(DelayedRecurringTaskBase<T> source)
: base(source)
{
_delayMilliseconds = 0;
}
public WaitHandle DelayWaitHandle
{
get
{
if (_delayMilliseconds == 0) return new ManualResetEvent(true);
if (_gate != null) return _gate;
_gate = new ManualResetEvent(false);
_timer = new Timer(_ =>
{
_timer.Dispose();
_timer = null;
_gate.Set();
}, null, _delayMilliseconds, 0);
return _gate;
}
}
public bool IsDelayed
{
get { return _delayMilliseconds > 0; }
}
}
}

View File

@@ -3,10 +3,26 @@ using System.Threading.Tasks;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Represents a background task.
/// </summary>
internal interface IBackgroundTask : IDisposable
{
/// <summary>
/// Runs the background task.
/// </summary>
void Run();
/// <summary>
/// Runs the task asynchronously.
/// </summary>
/// <returns>A <see cref="Task"/> instance representing the execution of the background task.</returns>
/// <exception cref="NotImplementedException">The background task cannot run asynchronously.</exception>
Task RunAsync();
/// <summary>
/// Indicates whether the background task can run asynchronously.
/// </summary>
bool IsAsync { get; }
}
}

View File

@@ -0,0 +1,20 @@
using System;
using System.Web.Hosting;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Defines a service managing a queue of tasks of type <typeparamref name="T"/> and running them in the background.
/// </summary>
/// <typeparam name="T">The type of the managed tasks.</typeparam>
/// <remarks>The interface is not complete and exists only to have the contravariance on T.</remarks>
internal interface IBackgroundTaskRunner<in T> : IDisposable, IRegisteredObject
where T : class, IBackgroundTask
{
bool IsCompleted { get; }
void Add(T task);
bool TryAdd(T task);
// fixme - complete the interface?
}
}

View File

@@ -0,0 +1,23 @@
using System.Threading;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Represents a delayed background task.
/// </summary>
/// <remarks>Delayed background tasks can suspend their execution until
/// a condition is met. However if the tasks runner has to terminate,
/// delayed background tasks are executed immediately.</remarks>
internal interface IDelayedBackgroundTask : IBackgroundTask
{
/// <summary>
/// Gets a wait handle on the task condition.
/// </summary>
WaitHandle DelayWaitHandle { get; }
/// <summary>
/// Gets a value indicating whether the task is delayed.
/// </summary>
bool IsDelayed { get; }
}
}

View File

@@ -0,0 +1,108 @@
using System.Threading;
using System.Threading.Tasks;
namespace Umbraco.Web.Scheduling
{
/// <summary>
/// Provides a base class for recurring background tasks.
/// </summary>
/// <typeparam name="T">The type of the managed tasks.</typeparam>
internal abstract class RecurringTaskBase<T> : IBackgroundTask
where T : class, IBackgroundTask
{
private readonly IBackgroundTaskRunner<T> _runner;
private readonly int _periodMilliseconds;
private Timer _timer;
/// <summary>
/// Initializes a new instance of the <see cref="RecurringTaskBase{T}"/> class with a tasks runner and a period.
/// </summary>
/// <param name="runner">The task runner.</param>
/// <param name="periodMilliseconds">The period.</param>
/// <remarks>The task will repeat itself periodically. Use this constructor to create a new task.</remarks>
protected RecurringTaskBase(IBackgroundTaskRunner<T> runner, int periodMilliseconds)
{
_runner = runner;
_periodMilliseconds = periodMilliseconds;
}
/// <summary>
/// Initializes a new instance of the <see cref="RecurringTaskBase{T}"/> class with a source task.
/// </summary>
/// <param name="source">The source task.</param>
/// <remarks>Use this constructor to create a new task from a source task in <c>GetRecurring</c>.</remarks>
protected RecurringTaskBase(RecurringTaskBase<T> source)
{
_runner = source._runner;
_periodMilliseconds = source._periodMilliseconds;
}
/// <summary>
/// Implements IBackgroundTask.Run().
/// </summary>
/// <remarks>Classes inheriting from <c>RecurringTaskBase</c> must implement <c>PerformRun</c>.</remarks>
public void Run()
{
PerformRun();
Repeat();
}
/// <summary>
/// Implements IBackgroundTask.RunAsync().
/// </summary>
/// <remarks>Classes inheriting from <c>RecurringTaskBase</c> must implement <c>PerformRun</c>.</remarks>
public async Task RunAsync()
{
await PerformRunAsync();
Repeat();
}
private void Repeat()
{
// again?
if (_runner.IsCompleted) return; // fail fast
if (_periodMilliseconds == 0) return;
var recur = GetRecurring();
if (recur == null) return; // done
_timer = new Timer(_ =>
{
_timer.Dispose();
_timer = null;
_runner.TryAdd(recur);
}, null, _periodMilliseconds, 0);
}
/// <summary>
/// Indicates whether the background task can run asynchronously.
/// </summary>
public abstract bool IsAsync { get; }
/// <summary>
/// Runs the background task.
/// </summary>
public abstract void PerformRun();
/// <summary>
/// Runs the task asynchronously.
/// </summary>
/// <returns>A <see cref="Task"/> instance representing the execution of the background task.</returns>
public abstract Task PerformRunAsync();
/// <summary>
/// Gets a new occurence of the recurring task.
/// </summary>
/// <returns>A new task instance to be queued, or <c>null</c> to terminate the recurring task.</returns>
/// <remarks>The new task instance must be created via the <c>RecurringTaskBase(RecurringTaskBase{T} source)</c> constructor,
/// where <c>source</c> is the current task, eg: <c>return new MyTask(this);</c></remarks>
protected abstract T GetRecurring();
/// <summary>
/// Dispose the task.
/// </summary>
public virtual void Dispose()
{ }
}
}

View File

@@ -0,0 +1,62 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Umbraco.Web.Scheduling
{
internal static class TaskAndFactoryExtensions
{
#region Task Extensions
static void SetCompletionSource<TResult>(TaskCompletionSource<TResult> completionSource, Task task)
{
if (task.IsFaulted)
completionSource.SetException(task.Exception.InnerException);
else
completionSource.SetResult(default(TResult));
}
static void SetCompletionSource<TResult>(TaskCompletionSource<TResult> completionSource, Task<TResult> task)
{
if (task.IsFaulted)
completionSource.SetException(task.Exception.InnerException);
else
completionSource.SetResult(task.Result);
}
public static Task ContinueWithTask(this Task task, Func<Task, Task> continuation)
{
var completionSource = new TaskCompletionSource<object>();
task.ContinueWith(atask => continuation(atask).ContinueWith(atask2 => SetCompletionSource(completionSource, atask2)));
return completionSource.Task;
}
public static Task ContinueWithTask(this Task task, Func<Task, Task> continuation, CancellationToken token)
{
var completionSource = new TaskCompletionSource<object>();
task.ContinueWith(atask => continuation(atask).ContinueWith(atask2 => SetCompletionSource(completionSource, atask2), token), token);
return completionSource.Task;
}
#endregion
#region TaskFactory Extensions
public static Task Completed(this TaskFactory factory)
{
var taskSource = new TaskCompletionSource<object>();
taskSource.SetResult(null);
return taskSource.Task;
}
public static Task Sync(this TaskFactory factory, Action action)
{
var taskSource = new TaskCompletionSource<object>();
action();
taskSource.SetResult(null);
return taskSource.Task;
}
#endregion
}
}

View File

@@ -499,6 +499,11 @@
<Compile Include="Routing\CustomRouteUrlProvider.cs" />
<Compile Include="Routing\UrlProviderExtensions.cs" />
<Compile Include="Scheduling\BackgroundTaskRunnerOptions.cs" />
<Compile Include="Scheduling\DelayedRecurringTaskBase.cs" />
<Compile Include="Scheduling\IBackgroundTaskRunner.cs" />
<Compile Include="Scheduling\IDelayedBackgroundTask.cs" />
<Compile Include="Scheduling\RecurringTaskBase.cs" />
<Compile Include="Scheduling\TaskAndFactoryExtensions.cs" />
<Compile Include="Strategies\Migrations\ClearCsrfCookiesAfterUpgrade.cs" />
<Compile Include="Strategies\Migrations\ClearMediaXmlCacheForDeletedItemsAfterUpgrade.cs" />
<Compile Include="Strategies\Migrations\EnsureListViewDataTypeIsCreated.cs" />

View File

@@ -33,7 +33,7 @@ namespace umbraco
public class content
{
private static readonly BackgroundTaskRunner<XmlCacheFilePersister> FilePersister
= new BackgroundTaskRunner<XmlCacheFilePersister>(new BackgroundTaskRunnerOptions {DedicatedThread = true, OnlyProcessLastItem = true});
= new BackgroundTaskRunner<XmlCacheFilePersister>(new BackgroundTaskRunnerOptions { LongRunning = true });
#region Declarations