using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Web.Hosting; using Umbraco.Core; using Umbraco.Core.Events; using Umbraco.Core.Logging; using LightInject; namespace Umbraco.Web.Scheduling { /// /// Manages a queue of tasks and runs them in the background. /// /// This class exists for logging purposes - the one you want to use is BackgroundTaskRunner{T}. public abstract class BackgroundTaskRunner { } /// /// Manages a queue of tasks of type and runs them in the background. /// /// The type of the managed tasks. /// The task runner is web-aware and will ensure that it shuts down correctly when the AppDomain /// shuts down (ie is unloaded). public class BackgroundTaskRunner : BackgroundTaskRunner, IBackgroundTaskRunner where T : class, IBackgroundTask { // do not remove this comment! // // if you plan to do anything on this class, first go and read // http://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html // http://stackoverflow.com/questions/19481964/calling-taskcompletionsource-setresult-in-a-non-blocking-manner // http://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont // and more, and more, and more // and remember: async is hard private readonly string _logPrefix; private readonly BackgroundTaskRunnerOptions _options; private readonly ILogger _logger; private readonly object _locker = new object(); private readonly BufferBlock _tasks = new BufferBlock(new DataflowBlockOptions()); // in various places we are testing these vars outside a lock, so make them volatile private volatile bool _isRunning; // is running private volatile bool _completed; // does not accept tasks anymore, may still be running private Task _runningTask; // the threading task that is currently executing background tasks private CancellationTokenSource _shutdownTokenSource; // used to cancel everything and shutdown private CancellationTokenSource _cancelTokenSource; // used to cancel the current task private CancellationToken _shutdownToken; private bool _terminating; // ensures we raise that event only once private bool _terminated; // remember we've terminated private readonly TaskCompletionSource _terminatedSource = new TaskCompletionSource(); // enable awaiting termination // fixme - this is temp // at the moment MainDom is internal so we have to find a way to hook into it - temp public class MainDomHook { private MainDomHook(MainDom mainDom, Action install, Action release) { MainDom = mainDom; Install = install; Release = release; } internal MainDom MainDom { get; } public Action Install { get; } public Action Release { get; } public static MainDomHook Create(Action install, Action release) { return new MainDomHook(Core.Composing.Current.Container.GetInstance(), install, release); } public static MainDomHook CreateForTest(Action install, Action release) { return new MainDomHook(null, install, release); } public bool Register() { if (MainDom != null) return MainDom.Register(Install, Release); // tests Install?.Invoke(); return true; } } /// /// Initializes a new instance of the class. /// /// A logger. /// An optional main domain hook. public BackgroundTaskRunner(ILogger logger, MainDomHook hook = null) : this(typeof(T).FullName, new BackgroundTaskRunnerOptions(), logger, hook) { } /// /// Initializes a new instance of the class. /// /// The name of the runner. /// A logger. /// An optional main domain hook. public BackgroundTaskRunner(string name, ILogger logger, MainDomHook hook = null) : this(name, new BackgroundTaskRunnerOptions(), logger, hook) { } /// /// Initializes a new instance of the class with a set of options. /// /// The set of options. /// A logger. /// An optional main domain hook. public BackgroundTaskRunner(BackgroundTaskRunnerOptions options, ILogger logger, MainDomHook hook = null) : this(typeof(T).FullName, options, logger, hook) { } /// /// Initializes a new instance of the class with a set of options. /// /// The name of the runner. /// The set of options. /// A logger. /// An optional main domain hook. public BackgroundTaskRunner(string name, BackgroundTaskRunnerOptions options, ILogger logger, MainDomHook hook = null) { if (options == null) throw new ArgumentNullException(nameof(options)); if (logger == null) throw new ArgumentNullException(nameof(logger)); _options = options; _logPrefix = "[" + name + "] "; _logger = logger; if (options.Hosted) HostingEnvironment.RegisterObject(this); if (hook != null) _completed = _terminated = hook.Register() == false; if (options.AutoStart && _terminated == false) StartUp(); } /// /// Gets the number of tasks in the queue. /// public int TaskCount => _tasks.Count; /// /// Gets a value indicating whether a threading task is currently running. /// public bool IsRunning => _isRunning; /// /// Gets a value indicating whether the runner has completed and cannot accept tasks anymore. /// public bool IsCompleted => _completed; /// /// Gets the running threading task as an immutable awaitable. /// /// There is no running task. /// /// Unless the AutoStart option is true, there will be no current threading task until /// a background task is added to the queue, and there will be no current threading task /// when the queue is empty. In which case this method returns null. /// The returned value can be awaited and that is all (eg no continuation). /// internal ThreadingTaskImmutable CurrentThreadingTask { get { lock (_locker) { return _runningTask == null ? null : new ThreadingTaskImmutable(_runningTask); } } } /// /// Gets an awaitable used to await the runner running operation. /// /// An awaitable instance. /// Used to wait until the runner is no longer running (IsRunning == false), /// though the runner could be started again afterwards by adding tasks to it. If /// the runner is not running, returns a completed awaitable. public ThreadingTaskImmutable StoppedAwaitable { get { lock (_locker) { var task = _runningTask ?? Task.FromResult(0); return new ThreadingTaskImmutable(task); } } } /// /// Gets an awaitable object that can be used to await for the runner to terminate. /// /// An awaitable object. /// /// Used to wait until the runner has terminated. /// This is for unit tests and should not be used otherwise. In most cases when the runner /// has terminated, the application domain is going down and it is not the right time to do things. /// internal ThreadingTaskImmutable TerminatedAwaitable { get { lock (_locker) { return new ThreadingTaskImmutable(_terminatedSource.Task); } } } /// /// Adds a task to the queue. /// /// The task to add. /// The task runner has completed. public void Add(T task) { lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); // add task _logger.Debug("{LogPrefix} Task Added {TaskType}", _logPrefix , task.GetType().FullName); _tasks.Post(task); // start StartUpLocked(); } } /// /// Tries to add a task to the queue. /// /// The task to add. /// true if the task could be added to the queue; otherwise false. /// Returns false if the runner is completed. public bool TryAdd(T task) { lock (_locker) { if (_completed) { _logger.Debug("{LogPrefix} Task cannot be added {TaskType}, the task runner has already shutdown", _logPrefix, task.GetType().FullName); return false; } // add task _logger.Debug("{LogPrefix} Task added {TaskType}", _logPrefix, task.GetType().FullName); _tasks.Post(task); // start StartUpLocked(); return true; } } /// /// Cancels to current task, if any. /// /// Has no effect if the task runs synchronously, or does not want to cancel. public void CancelCurrentBackgroundTask() { lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); _cancelTokenSource?.Cancel(); } } /// /// Starts the tasks runner, if not already running. /// /// Is invoked each time a task is added, to ensure it is going to be processed. /// The task runner has completed. internal void StartUp() { if (_isRunning) return; lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); StartUpLocked(); } } /// /// Starts the tasks runner, if not already running. /// /// Must be invoked within lock(_locker) and with _isCompleted being false. private void StartUpLocked() { // double check if (_isRunning) return; _isRunning = true; // create a new token source since this is a new process _shutdownTokenSource = new CancellationTokenSource(); _shutdownToken = _shutdownTokenSource.Token; _runningTask = Task.Run(async () => await Pump().ConfigureAwait(false), _shutdownToken); _logger.Debug("{LogPrefix} Starting", _logPrefix); } /// /// Shuts the taks runner down. /// /// True for force the runner to stop. /// True to wait until the runner has stopped. /// If is false, no more tasks can be queued but all queued tasks /// will run. If it is true, then only the current one (if any) will end and no other task will run. public void Shutdown(bool force, bool wait) { lock (_locker) { _completed = true; // do not accept new tasks if (_isRunning == false) return; // done already } // complete the queue // will stop waiting on the queue or on a latch _tasks.Complete(); if (force) { // we must bring everything down, now Thread.Sleep(100); // give time to Complete() lock (_locker) { // was Complete() enough? if (_isRunning == false) return; } // try to cancel running async tasks (cannot do much about sync tasks) // break latched tasks // stop processing the queue _shutdownTokenSource.Cancel(false); // false is the default } // tasks in the queue will be executed... if (wait == false) return; _runningTask?.Wait(); // wait for whatever is running to end... } private async Task Pump() { while (true) { // get the next task // if it returns null the runner is going down, stop var bgTask = await GetNextBackgroundTask(_shutdownToken); if (bgTask == null) return; // set a cancellation source so that the current task can be cancelled // link from _shutdownToken so that we can use _cancelTokenSource for both lock (_locker) { _cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_shutdownToken); } // wait for latch should return the task // if it returns null it's either that the task has been cancelled // or the whole runner is going down - in both cases, continue, // and GetNextBackgroundTask will take care of shutdowns bgTask = await WaitForLatch(bgTask, _cancelTokenSource.Token); if (bgTask == null) continue; // executes & be safe - RunAsync should NOT throw but only raise an event, // but... just make sure we never ever take everything down try { await RunAsync(bgTask, _cancelTokenSource.Token).ConfigureAwait(false); } catch (Exception ex) { _logger.Error(ex, "{LogPrefix} Task runner exception", _logPrefix); } // done lock (_locker) { _cancelTokenSource = null; } } } // gets the next background task from the buffer private async Task GetNextBackgroundTask(CancellationToken token) { while (true) { var task = await GetNextBackgroundTask2(token); if (task != null) return task; lock (_locker) { // deal with race condition if (_shutdownToken.IsCancellationRequested == false && _tasks.Count > 0) continue; // if we really have nothing to do, stop _logger.Debug("{LogPrefix} Stopping", _logPrefix); if (_options.PreserveRunningTask == false) _runningTask = null; _isRunning = false; _shutdownToken = CancellationToken.None; } OnEvent(Stopped, "Stopped"); return null; } } private async Task GetNextBackgroundTask2(CancellationToken shutdownToken) { // exit if cancelling if (shutdownToken.IsCancellationRequested) return null; // if keepalive is false then don't block, exit if there is // no task in the buffer - yes, there is a race cond, which // we'll take care of if (_options.KeepAlive == false && _tasks.Count == 0) return null; try { // A Task that informs of whether and when more output is available. If, when the // task completes, its Result is true, more output is available in the source (though another // consumer of the source may retrieve the data). If it returns false, more output is not // and will never be available, due to the source completing prior to output being available. var output = await _tasks.OutputAvailableAsync(shutdownToken); // block until output or cancelled if (output == false) return null; } catch (TaskCanceledException) { return null; } try { // A task that represents the asynchronous receive operation. When an item value is successfully // received from the source, the returned task is completed and its Result returns the received // value. If an item value cannot be retrieved because the source is empty and completed, an // InvalidOperationException exception is thrown in the returned task. // the source cannot be empty *and* completed here - we know we have output return await _tasks.ReceiveAsync(shutdownToken); } catch (TaskCanceledException) { return null; } } // if bgTask is not a latched background task, or if it is not latched, returns immediately // else waits for the latch, taking care of completion and shutdown and whatnot private async Task WaitForLatch(T bgTask, CancellationToken token) { var latched = bgTask as ILatchedBackgroundTask; if (latched == null || latched.IsLatched == false) return bgTask; // support cancelling awaiting // read https://github.com/dotnet/corefx/issues/2704 // read http://stackoverflow.com/questions/27238232/how-can-i-cancel-task-whenall var tokenTaskSource = new TaskCompletionSource(); token.Register(s => ((TaskCompletionSource)s).SetResult(true), tokenTaskSource); // returns the task that completed // - latched.Latch completes when the latch releases // - _tasks.Completion completes when the runner completes // - tokenTaskSource.Task completes when this task, or the whole runner, is cancelled var task = await Task.WhenAny(latched.Latch, _tasks.Completion, tokenTaskSource.Task); // ok to run now if (task == latched.Latch) return bgTask; // if shutting down, return the task only if it runs on shutdown if (_shutdownToken.IsCancellationRequested == false && latched.RunsOnShutdown) return bgTask; // else, either it does not run on shutdown or it's been cancelled, dispose latched.Dispose(); return null; } // runs the background task, taking care of shutdown (as far as possible - cannot abort // a non-async Run for example, so we'll do our best) private async Task RunAsync(T bgTask, CancellationToken token) { try { OnTaskStarting(new TaskEventArgs(bgTask)); try { try { if (bgTask.IsAsync) //configure await = false since we don't care about the context, we're on a background thread. await bgTask.RunAsync(token).ConfigureAwait(false); else bgTask.Run(); } finally // ensure we disposed - unless latched again ie wants to re-run { var lbgTask = bgTask as ILatchedBackgroundTask; if (lbgTask == null || lbgTask.IsLatched == false) bgTask.Dispose(); } } catch (Exception e) { OnTaskError(new TaskEventArgs(bgTask, e)); throw; } OnTaskCompleted(new TaskEventArgs(bgTask)); } catch (Exception ex) { _logger.Error(ex, "{LogPrefix} Task has failed", _logPrefix); } } #region Events // triggers when a background task starts public event TypedEventHandler, TaskEventArgs> TaskStarting; // triggers when a background task has completed public event TypedEventHandler, TaskEventArgs> TaskCompleted; // triggers when a background task throws public event TypedEventHandler, TaskEventArgs> TaskError; // triggers when a background task is cancelled public event TypedEventHandler, TaskEventArgs> TaskCancelled; // triggers when the runner stops (but could start again if a task is added to it) internal event TypedEventHandler, EventArgs> Stopped; // triggers when the hosting environment requests that the runner terminates internal event TypedEventHandler, EventArgs> Terminating; // triggers when the runner has terminated (no task can be added, no task is running) internal event TypedEventHandler, EventArgs> Terminated; private void OnEvent(TypedEventHandler, EventArgs> handler, string name) { if (handler == null) return; OnEvent(handler, name, EventArgs.Empty); } private void OnEvent(TypedEventHandler, TArgs> handler, string name, TArgs e) { if (handler == null) return; try { handler(this, e); } catch (Exception ex) { _logger.Error(ex, "{LogPrefix} {Name} exception occurred", _logPrefix, name); } } protected virtual void OnTaskError(TaskEventArgs e) { OnEvent(TaskError, "TaskError", e); } protected virtual void OnTaskStarting(TaskEventArgs e) { OnEvent(TaskStarting, "TaskStarting", e); } protected virtual void OnTaskCompleted(TaskEventArgs e) { OnEvent(TaskCompleted, "TaskCompleted", e); } protected virtual void OnTaskCancelled(TaskEventArgs e) { OnEvent(TaskCancelled, "TaskCancelled", e); // dispose it e.Task.Dispose(); } #endregion #region IDisposable private readonly object _disposalLocker = new object(); public bool IsDisposed { get; private set; } ~BackgroundTaskRunner() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (IsDisposed || disposing == false) return; lock (_disposalLocker) { if (IsDisposed) return; DisposeResources(); IsDisposed = true; } } protected virtual void DisposeResources() { // just make sure we eventually go down Shutdown(true, false); } #endregion /// /// Requests a registered object to unregister. /// /// true to indicate the registered object should unregister from the hosting /// environment before returning; otherwise, false. /// /// "When the application manager needs to stop a registered object, it will call the Stop method." /// The application manager will call the Stop method to ask a registered object to unregister. During /// processing of the Stop method, the registered object must call the HostingEnvironment.UnregisterObject method. /// public void Stop(bool immediate) { // the first time the hosting environment requests that the runner terminates, // raise the Terminating event - that could be used to prevent any process that // would expect the runner to be available from starting. var onTerminating = false; lock (_locker) { if (_terminating == false) { _terminating = true; _logger.Info("{LogPrefix} Terminating {Immediate}", _logPrefix, immediate ? immediate.ToString() : string.Empty); onTerminating = true; } } if (onTerminating) OnEvent(Terminating, "Terminating"); if (immediate == false) { // The Stop method is first called with the immediate parameter set to false. The object can either complete // processing, call the UnregisterObject method, and then return or it can return immediately and complete // processing asynchronously before calling the UnregisterObject method. _logger.Info("{LogPrefix} Waiting for tasks to complete", _logPrefix); Shutdown(false, false); // do not accept any more tasks, flush the queue, do not wait // raise the completed event only after the running threading task has completed lock (_locker) { if (_runningTask != null) _runningTask.ContinueWith(_ => Terminate(false)); else Terminate(false); } } else { // If the registered object does not complete processing before the application manager's time-out // period expires, the Stop method is called again with the immediate parameter set to true. When the // immediate parameter is true, the registered object must call the UnregisterObject method before returning; // otherwise, its registration will be removed by the application manager. _logger.Info("{LogPrefix} Cancelling tasks", _logPrefix); Shutdown(true, true); // cancel all tasks, wait for the current one to end Terminate(true); } } // called by Stop either immediately or eventually private void Terminate(bool immediate) { // signal the environment we have terminated // log // raise the Terminated event // complete the awaitable completion source, if any HostingEnvironment.UnregisterObject(this); TaskCompletionSource terminatedSource; lock (_locker) { _terminated = true; terminatedSource = _terminatedSource; } _logger.Info("{LogPrefix} Tasks {TaskStatus}, terminated", _logPrefix, immediate ? "cancelled" : "completed"); OnEvent(Terminated, "Terminated"); terminatedSource.SetResult(0); } } }