using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Web.Hosting;
using Umbraco.Core.Events;
using Umbraco.Core.Logging;
namespace Umbraco.Web.Scheduling
{
// exists for logging purposes
internal class BackgroundTaskRunner
{ }
///
/// Manages a queue of tasks of type and runs them in the background.
///
/// The type of the managed tasks.
/// The task runner is web-aware and will ensure that it shuts down correctly when the AppDomain
/// shuts down (ie is unloaded).
internal class BackgroundTaskRunner : BackgroundTaskRunner, IBackgroundTaskRunner
where T : class, IBackgroundTask
{
private readonly string _logPrefix;
private readonly BackgroundTaskRunnerOptions _options;
private readonly BlockingCollection _tasks = new BlockingCollection();
private readonly object _locker = new object();
// that event is used to stop the pump when it is alive and waiting
// on a latched task - so it waits on the latch, the cancellation token,
// and the completed event
private readonly ManualResetEventSlim _completedEvent = new ManualResetEventSlim(false);
// in various places we are testing these vars outside a lock, so make them volatile
private volatile bool _isRunning; // is running
private volatile bool _isCompleted; // does not accept tasks anymore, may still be running
private Task _runningTask;
private CancellationTokenSource _tokenSource;
private bool _terminating; // ensures we raise that event only once
private bool _terminated; // remember we've terminated
private TaskCompletionSource _terminatedSource; // awaitable source
internal event TypedEventHandler, TaskEventArgs> TaskError;
internal event TypedEventHandler, TaskEventArgs> TaskStarting;
internal event TypedEventHandler, TaskEventArgs> TaskCompleted;
internal event TypedEventHandler, TaskEventArgs> TaskCancelled;
// triggers when the runner stops (but could start again if a task is added to it)
internal event TypedEventHandler, EventArgs> Stopped;
// triggers when the hosting environment requests that the runner terminates
internal event TypedEventHandler, EventArgs> Terminating;
// triggers when the runner terminates (no task can be added, no task is running)
internal event TypedEventHandler, EventArgs> Terminated;
///
/// Initializes a new instance of the class.
///
public BackgroundTaskRunner()
: this(typeof (T).FullName, new BackgroundTaskRunnerOptions())
{ }
///
/// Initializes a new instance of the class.
///
/// The name of the runner.
public BackgroundTaskRunner(string name)
: this(name, new BackgroundTaskRunnerOptions())
{ }
///
/// Initializes a new instance of the class with a set of options.
///
/// The set of options.
public BackgroundTaskRunner(BackgroundTaskRunnerOptions options)
: this(typeof (T).FullName, options)
{ }
///
/// Initializes a new instance of the class with a set of options.
///
/// The name of the runner.
/// The set of options.
public BackgroundTaskRunner(string name, BackgroundTaskRunnerOptions options)
{
if (options == null) throw new ArgumentNullException("options");
_options = options;
_logPrefix = "[" + name + "] ";
HostingEnvironment.RegisterObject(this);
if (options.AutoStart)
StartUp();
}
///
/// Gets the number of tasks in the queue.
///
public int TaskCount
{
get { return _tasks.Count; }
}
///
/// Gets a value indicating whether a task is currently running.
///
public bool IsRunning
{
get { return _isRunning; }
}
///
/// Gets a value indicating whether the runner has completed and cannot accept tasks anymore.
///
public bool IsCompleted
{
get { return _isCompleted; }
}
///
/// Gets the running task as an immutable object.
///
/// There is no running task.
///
/// Unless the AutoStart option is true, there will be no running task until
/// a background task is added to the queue. Unless the KeepAlive option is true, there
/// will be no running task when the queue is empty.
///
public ThreadingTaskImmutable CurrentThreadingTask
{
get
{
lock (_locker)
{
if (_runningTask == null)
throw new InvalidOperationException("There is no current Threading.Task.");
return new ThreadingTaskImmutable(_runningTask);
}
}
}
///
/// Gets an awaitable used to await the runner running operation.
///
/// An awaitable instance.
/// Used to wait until the runner is no longer running (IsRunning == false),
/// though the runner could be started again afterwards by adding tasks to it.
public ThreadingTaskImmutable StoppedAwaitable
{
get
{
lock (_locker)
{
var task = _runningTask ?? Task.FromResult(0);
return new ThreadingTaskImmutable(task);
}
}
}
///
/// Gets an awaitable used to await the runner.
///
/// An awaitable instance.
/// Used to wait until the runner is terminated.
public ThreadingTaskImmutable TerminatedAwaitable
{
get
{
lock (_locker)
{
if (_terminatedSource == null && _terminated == false)
_terminatedSource = new TaskCompletionSource();
var task = _terminatedSource == null ? Task.FromResult(0) : _terminatedSource.Task;
return new ThreadingTaskImmutable(task);
}
}
}
///
/// Adds a task to the queue.
///
/// The task to add.
/// The task runner has completed.
public void Add(T task)
{
lock (_locker)
{
if (_isCompleted)
throw new InvalidOperationException("The task runner has completed.");
// add task
LogHelper.Debug(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
}
}
///
/// Tries to add a task to the queue.
///
/// The task to add.
/// true if the task could be added to the queue; otherwise false.
/// Returns false if the runner is completed.
public bool TryAdd(T task)
{
lock (_locker)
{
if (_isCompleted) return false;
// add task
LogHelper.Debug(_logPrefix + "Task added {0}", task.GetType);
_tasks.Add(task);
// start
StartUpLocked();
return true;
}
}
///
/// Starts the tasks runner, if not already running.
///
/// Is invoked each time a task is added, to ensure it is going to be processed.
/// The task runner has completed.
public void StartUp()
{
if (_isRunning) return;
lock (_locker)
{
if (_isCompleted)
throw new InvalidOperationException("The task runner has completed.");
StartUpLocked();
}
}
///
/// Starts the tasks runner, if not already running.
///
/// Must be invoked within lock(_locker) and with _isCompleted being false.
private void StartUpLocked()
{
// double check
if (_isRunning) return;
_isRunning = true;
// create a new token source since this is a new process
_tokenSource = new CancellationTokenSource();
_runningTask = PumpIBackgroundTasks(Task.Factory, _tokenSource.Token);
LogHelper.Debug(_logPrefix + "Starting");
}
///
/// Shuts the taks runner down.
///
/// True for force the runner to stop.
/// True to wait until the runner has stopped.
/// If is false, no more tasks can be queued but all queued tasks
/// will run. If it is true, then only the current one (if any) will end and no other task will run.
public void Shutdown(bool force, bool wait)
{
lock (_locker)
{
_isCompleted = true; // do not accept new tasks
if (_isRunning == false) return; // done already
}
// try to be nice
// assuming multiple threads can do these without problems
_completedEvent.Set();
_tasks.CompleteAdding();
if (force)
{
// we must bring everything down, now
Thread.Sleep(100); // give time to CompleteAdding()
lock (_locker)
{
// was CompleteAdding() enough?
if (_isRunning == false) return;
}
// try to cancel running async tasks (cannot do much about sync tasks)
// break delayed tasks delay
// truncate running queues
_tokenSource.Cancel(false); // false is the default
}
// tasks in the queue will be executed...
if (wait == false) return;
_runningTask.Wait(); // wait for whatever is running to end...
}
///
/// Runs background tasks for as long as there are background tasks in the queue, with an asynchronous operation.
///
/// The supporting .
/// A cancellation token.
/// The asynchronous operation.
private Task PumpIBackgroundTasks(TaskFactory factory, CancellationToken token)
{
var taskSource = new TaskCompletionSource