using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Web.Hosting;
using Umbraco.Core;
using Umbraco.Core.Events;
using Umbraco.Core.Logging;
namespace Umbraco.Web.Scheduling
{
///
/// Manages a queue of tasks and runs them in the background.
///
/// This class exists for logging purposes - the one you want to use is BackgroundTaskRunner{T}.
public abstract class BackgroundTaskRunner
{ }
///
/// Manages a queue of tasks of type and runs them in the background.
///
/// The type of the managed tasks.
/// The task runner is web-aware and will ensure that it shuts down correctly when the AppDomain
/// shuts down (ie is unloaded).
public class BackgroundTaskRunner : BackgroundTaskRunner, IBackgroundTaskRunner
where T : class, IBackgroundTask
{
private readonly string _logPrefix;
private readonly BackgroundTaskRunnerOptions _options;
private readonly ILogger _logger;
private readonly object _locker = new object();
private readonly BlockingCollection _tasks = new BlockingCollection();
private IEnumerator _enumerator;
// that event is used to stop the pump when it is alive and waiting
// on a latched task - so it waits on the latch, the cancellation token,
// and the completed event
private readonly ManualResetEventSlim _completedEvent = new ManualResetEventSlim(false);
// in various places we are testing these vars outside a lock, so make them volatile
private volatile bool _isRunning; // is running
private volatile bool _completed; // does not accept tasks anymore, may still be running
private Task _runningTask; // the threading task that is currently executing background tasks
private CancellationTokenSource _shutdownTokenSource; // used to cancel everything and shutdown
private CancellationTokenSource _cancelTokenSource; // used to cancel the current task
private CancellationToken _shutdownToken;
private bool _terminating; // ensures we raise that event only once
private bool _terminated; // remember we've terminated
private TaskCompletionSource _terminatedSource; // enable awaiting termination
///
/// Initializes a new instance of the class.
///
/// A logger.
/// An optional action to execute when the main domain status is aquired.
/// An optional action to execute when the main domain status is released.
public BackgroundTaskRunner(ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
: this(typeof (T).FullName, new BackgroundTaskRunnerOptions(), logger, mainDomInstall, mainDomRelease)
{ }
///
/// Initializes a new instance of the class.
///
/// The name of the runner.
/// A logger.
/// An optional action to execute when the main domain status is aquired.
/// An optional action to execute when the main domain status is released.
public BackgroundTaskRunner(string name, ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
: this(name, new BackgroundTaskRunnerOptions(), logger, mainDomInstall, mainDomRelease)
{ }
///
/// Initializes a new instance of the class with a set of options.
///
/// The set of options.
/// A logger.
/// An optional action to execute when the main domain status is aquired.
/// An optional action to execute when the main domain status is released.
public BackgroundTaskRunner(BackgroundTaskRunnerOptions options, ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
: this(typeof (T).FullName, options, logger, mainDomInstall, mainDomRelease)
{ }
///
/// Initializes a new instance of the class with a set of options.
///
/// The name of the runner.
/// The set of options.
/// A logger.
/// An optional action to execute when the main domain status is aquired.
/// An optional action to execute when the main domain status is released.
public BackgroundTaskRunner(string name, BackgroundTaskRunnerOptions options, ILogger logger, Action mainDomInstall = null, Action mainDomRelease = null)
{
if (options == null) throw new ArgumentNullException("options");
if (logger == null) throw new ArgumentNullException("logger");
_options = options;
_logPrefix = "[" + name + "] ";
_logger = logger;
if (options.Hosted)
HostingEnvironment.RegisterObject(this);
if (mainDomInstall != null || mainDomRelease != null)
{
var appContext = ApplicationContext.Current;
var mainDom = appContext == null ? null : appContext.MainDom;
var reg = mainDom == null || ApplicationContext.Current.MainDom.Register(mainDomInstall, mainDomRelease);
if (reg == false)
_completed = _terminated = true;
if (reg && mainDom == null && mainDomInstall != null)
mainDomInstall();
}
if (options.AutoStart && _terminated == false)
StartUp();
}
///
/// Gets the number of tasks in the queue.
///
public int TaskCount
{
get { return _tasks.Count; }
}
///
/// Gets a value indicating whether a threading task is currently running.
///
public bool IsRunning
{
get { return _isRunning; }
}
///
/// Gets a value indicating whether the runner has completed and cannot accept tasks anymore.
///
public bool IsCompleted
{
get { return _completed; }
}
///
/// Gets the running threading task as an immutable awaitable.
///
/// There is no running task.
///
/// Unless the AutoStart option is true, there will be no current threading task until
/// a background task is added to the queue, and there will be no current threading task
/// when the queue is empty. In which case this method returns null.
/// The returned value can be awaited and that is all (eg no continuation).
///
internal ThreadingTaskImmutable CurrentThreadingTask
{
get
{
lock (_locker)
{
return _runningTask == null ? null : new ThreadingTaskImmutable(_runningTask);
}
}
}
// fixme should the above throw, return null, a completed task?
// fixme what's the diff?!
///
/// Gets an awaitable used to await the runner running operation.
///
/// An awaitable instance.
/// Used to wait until the runner is no longer running (IsRunning == false),
/// though the runner could be started again afterwards by adding tasks to it.
internal ThreadingTaskImmutable StoppedAwaitable
{
get
{
lock (_locker)
{
var task = _runningTask ?? Task.FromResult(0);
return new ThreadingTaskImmutable(task);
}
}
}
///
/// Gets an awaitable object that can be used to await for the runner to terminate.
///
/// An awaitable object.
///
/// Used to wait until the runner has terminated.
/// This is for unit tests and should not be used otherwise. In most cases when the runner
/// has terminated, the application domain is going down and it is not the right time to do things.
///
internal ThreadingTaskImmutable TerminatedAwaitable
{
get
{
lock (_locker)
{
if (_terminatedSource == null)
_terminatedSource = new TaskCompletionSource();
if (_terminated)
_terminatedSource.SetResult(0);
return new ThreadingTaskImmutable(_terminatedSource.Task);
}
}
}
///
/// Adds a task to the queue.
///
/// The task to add.
/// The task runner has completed.
public void Add(T task)
{
lock (_locker)
{
if (_completed)
throw new InvalidOperationException("The task runner has completed.");
// add task
_logger.Debug(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
}
}
///
/// Tries to add a task to the queue.
///
/// The task to add.
/// true if the task could be added to the queue; otherwise false.
/// Returns false if the runner is completed.
public bool TryAdd(T task)
{
lock (_locker)
{
if (_completed)
{
_logger.Debug(_logPrefix + "Task cannot be added {0}, the task runner has already shutdown", task.GetType);
return false;
}
// add task
_logger.Debug(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
return true;
}
}
///
/// Cancels to current task, if any.
///
/// Has no effect if the task runs synchronously, or does not want to cancel.
public void CancelCurrentBackgroundTask()
{
lock (_locker)
{
if (_completed)
throw new InvalidOperationException("The task runner has completed.");
if (_cancelTokenSource != null)
_cancelTokenSource.Cancel();
}
}
///
/// Starts the tasks runner, if not already running.
///
/// Is invoked each time a task is added, to ensure it is going to be processed.
/// The task runner has completed.
internal void StartUp()
{
if (_isRunning) return;
lock (_locker)
{
if (_completed)
throw new InvalidOperationException("The task runner has completed.");
StartUpLocked();
}
}
///
/// Starts the tasks runner, if not already running.
///
/// Must be invoked within lock(_locker) and with _isCompleted being false.
private void StartUpLocked()
{
// double check
if (_isRunning) return;
_isRunning = true;
// create a new token source since this is a new process
_shutdownTokenSource = new CancellationTokenSource();
_shutdownToken = _shutdownTokenSource.Token;
_enumerator = _options.KeepAlive ? _tasks.GetConsumingEnumerable(_shutdownToken).GetEnumerator() : null;
_runningTask = Task.Run(async () => await Pump(), _shutdownToken);
_logger.Debug(_logPrefix + "Starting");
}
///
/// Shuts the taks runner down.
///
/// True for force the runner to stop.
/// True to wait until the runner has stopped.
/// If is false, no more tasks can be queued but all queued tasks
/// will run. If it is true, then only the current one (if any) will end and no other task will run.
public void Shutdown(bool force, bool wait)
{
lock (_locker)
{
_completed = true; // do not accept new tasks
if (_isRunning == false) return; // done already
}
// try to be nice
// assuming multiple threads can do these without problems
_completedEvent.Set();
_tasks.CompleteAdding();
if (force)
{
// we must bring everything down, now
Thread.Sleep(100); // give time to CompleteAdding()
lock (_locker)
{
// was CompleteAdding() enough?
if (_isRunning == false) return;
}
// try to cancel running async tasks (cannot do much about sync tasks)
// break delayed tasks delay
// truncate running queues
_shutdownTokenSource.Cancel(false); // false is the default
}
// tasks in the queue will be executed...
if (wait == false) return;
if (_runningTask != null)
_runningTask.Wait(); // wait for whatever is running to end...
}
private async Task Pump()
{
while (true)
{
var bgTask = GetNextBackgroundTask();
if (bgTask == null)
return;
lock (_locker)
{
// set another one - for the next task
_cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_shutdownToken);
}
bgTask = WaitForLatch(bgTask, _cancelTokenSource.Token);
if (bgTask == null) return;
try
{
await RunAsync(bgTask, _cancelTokenSource.Token).ConfigureAwait(false);
}
catch (Exception e)
{
// RunAsync should NOT throw exception - just raise an event
// this is here for safety and to ensure we don't kill everything, ever
_logger.Error(_logPrefix + "Task runner exception.", e);
}
_cancelTokenSource = null;
}
}
private T GetNextBackgroundTask()
{
while (true)
{
// exit if cancelling
if (_shutdownToken.IsCancellationRequested == false)
{
// try to get a task
// the blocking MoveNext will end if token is cancelled or collection is completed
T bgTask;
var hasBgTask = _options.KeepAlive
? (bgTask = _enumerator.MoveNext() ? _enumerator.Current : null) != null // blocking
: _tasks.TryTake(out bgTask); // non-blocking
// exit if cancelling
if (_shutdownToken.IsCancellationRequested == false && hasBgTask)
return bgTask;
}
lock (_locker)
{
if (_shutdownToken.IsCancellationRequested == false && _tasks.Count > 0) continue;
_logger.Debug(_logPrefix + "Stopping");
if (_options.PreserveRunningTask == false)
_runningTask = null;
_isRunning = false;
_shutdownToken = CancellationToken.None;
_enumerator = null;
}
OnEvent(Stopped, "Stopped");
return null;
}
}
private T WaitForLatch(T bgTask, CancellationToken token)
{
var latched = bgTask as ILatchedBackgroundTask;
if (latched == null || latched.IsLatched == false) return bgTask;
// returns the array index of the object that satisfied the wait
var i = WaitHandle.WaitAny(new[] { latched.Latch, token.WaitHandle, _completedEvent.WaitHandle });
switch (i)
{
case 0:
// ok to run now
return bgTask;
case 1:
// cancellation
return null;
case 2:
// termination
if (latched.RunsOnShutdown) return bgTask;
latched.Dispose();
return null;
default:
throw new Exception("panic.");
}
}
private async Task RunAsync(T bgTask, CancellationToken token)
{
try
{
OnTaskStarting(new TaskEventArgs(bgTask));
try
{
try
{
if (bgTask.IsAsync)
//configure await = false since we don't care about the context, we're on a background thread.
await bgTask.RunAsync(token).ConfigureAwait(false);
else
bgTask.Run();
}
finally // ensure we disposed - unless latched again ie wants to re-run
{
var lbgTask = bgTask as ILatchedBackgroundTask;
if (lbgTask == null || lbgTask.IsLatched == false)
bgTask.Dispose();
}
}
catch (Exception e)
{
OnTaskError(new TaskEventArgs(bgTask, e));
throw;
}
OnTaskCompleted(new TaskEventArgs(bgTask));
}
catch (Exception ex)
{
_logger.Error(_logPrefix + "Task has failed", ex);
}
}
#region Events
// triggers when a background task starts
public event TypedEventHandler, TaskEventArgs> TaskStarting;
// triggers when a background task has completed
public event TypedEventHandler, TaskEventArgs> TaskCompleted;
// triggers when a background task throws
public event TypedEventHandler, TaskEventArgs> TaskError;
// triggers when a background task is cancelled
public event TypedEventHandler, TaskEventArgs> TaskCancelled;
// triggers when the runner stops (but could start again if a task is added to it)
internal event TypedEventHandler, EventArgs> Stopped;
// triggers when the hosting environment requests that the runner terminates
internal event TypedEventHandler, EventArgs> Terminating;
// triggers when the runner has terminated (no task can be added, no task is running)
internal event TypedEventHandler, EventArgs> Terminated;
private void OnEvent(TypedEventHandler, EventArgs> handler, string name)
{
if (handler == null) return;
OnEvent(handler, name, EventArgs.Empty);
}
private void OnEvent(TypedEventHandler, TArgs> handler, string name, TArgs e)
{
if (handler == null) return;
try
{
handler(this, e);
}
catch (Exception ex)
{
_logger.Error(_logPrefix + name + " exception occurred", ex);
}
}
protected virtual void OnTaskError(TaskEventArgs e)
{
OnEvent(TaskError, "TaskError", e);
}
protected virtual void OnTaskStarting(TaskEventArgs e)
{
OnEvent(TaskStarting, "TaskStarting", e);
}
protected virtual void OnTaskCompleted(TaskEventArgs e)
{
OnEvent(TaskCompleted, "TaskCompleted", e);
}
protected virtual void OnTaskCancelled(TaskEventArgs e)
{
OnEvent(TaskCancelled, "TaskCancelled", e);
//dispose it
e.Task.Dispose();
}
#endregion
#region IDisposable
private readonly object _disposalLocker = new object();
public bool IsDisposed { get; private set; }
~BackgroundTaskRunner()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (IsDisposed || disposing == false)
return;
lock (_disposalLocker)
{
if (IsDisposed)
return;
DisposeResources();
IsDisposed = true;
}
}
protected virtual void DisposeResources()
{
// just make sure we eventually go down
Shutdown(true, false);
}
#endregion
///
/// Requests a registered object to unregister.
///
/// true to indicate the registered object should unregister from the hosting
/// environment before returning; otherwise, false.
///
/// "When the application manager needs to stop a registered object, it will call the Stop method."
/// The application manager will call the Stop method to ask a registered object to unregister. During
/// processing of the Stop method, the registered object must call the HostingEnvironment.UnregisterObject method.
///
public void Stop(bool immediate)
{
// the first time the hosting environment requests that the runner terminates,
// raise the Terminating event - that could be used to prevent any process that
// would expect the runner to be available from starting.
var onTerminating = false;
lock (_locker)
{
if (_terminating == false)
{
_terminating = true;
_logger.Info(_logPrefix + "Terminating" + (immediate ? " (immediate)" : ""));
onTerminating = true;
}
}
if (onTerminating)
OnEvent(Terminating, "Terminating");
if (immediate == false)
{
// The Stop method is first called with the immediate parameter set to false. The object can either complete
// processing, call the UnregisterObject method, and then return or it can return immediately and complete
// processing asynchronously before calling the UnregisterObject method.
_logger.Info(_logPrefix + "Waiting for tasks to complete");
Shutdown(false, false); // do not accept any more tasks, flush the queue, do not wait
// raise the completed event only after the running threading task has completed
lock (_locker)
{
if (_runningTask != null)
_runningTask.ContinueWith(_ => Terminate(false));
else
Terminate(false);
}
}
else
{
// If the registered object does not complete processing before the application manager's time-out
// period expires, the Stop method is called again with the immediate parameter set to true. When the
// immediate parameter is true, the registered object must call the UnregisterObject method before returning;
// otherwise, its registration will be removed by the application manager.
_logger.Info(_logPrefix + "Cancelling tasks");
Shutdown(true, true); // cancel all tasks, wait for the current one to end
Terminate(true);
}
}
// called by Stop either immediately or eventually
private void Terminate(bool immediate)
{
// signal the environment we have terminated
// log
// raise the Terminated event
// complete the awaitable completion source, if any
HostingEnvironment.UnregisterObject(this);
TaskCompletionSource terminatedSource;
lock (_locker)
{
_terminated = true;
terminatedSource = _terminatedSource;
}
_logger.Info(_logPrefix + "Tasks " + (immediate ? "cancelled" : "completed") + ", terminated");
OnEvent(Terminated, "Terminated");
if (terminatedSource != null)
terminatedSource.SetResult(0);
}
}
}