using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.Logging; using Umbraco.Cms.Core.Events; using Umbraco.Cms.Core.Hosting; using Umbraco.Cms.Core.Runtime; namespace Umbraco.Web.Scheduling { /// /// Manages a queue of tasks and runs them in the background. /// /// This class exists for logging purposes - the one you want to use is BackgroundTaskRunner{T}. public abstract class BackgroundTaskRunner { /// /// Represents a MainDom hook. /// public class MainDomHook { /// /// Initializes a new instance of the class. /// /// The object. /// A method to execute when hooking into the main domain. /// A method to execute when the main domain releases. public MainDomHook(IMainDom mainDom, Action install, Action release) { MainDom = mainDom; Install = install; Release = release; } /// /// Gets the object. /// public IMainDom MainDom { get; } /// /// Gets the method to execute when hooking into the main domain. /// public Action Install { get; } /// /// Gets the method to execute when the main domain releases. /// public Action Release { get; } internal bool Register() { if (MainDom != null) { return MainDom.Register(Install, Release); } // tests Install?.Invoke(); return true; } } } /// /// Manages a queue of tasks of type and runs them in the background. /// /// The type of the managed tasks. /// The task runner is web-aware and will ensure that it shuts down correctly when the AppDomain /// shuts down (ie is unloaded). public class BackgroundTaskRunner : BackgroundTaskRunner, IBackgroundTaskRunner where T : class, IBackgroundTask { // do not remove this comment! // // if you plan to do anything on this class, first go and read // http://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html // http://stackoverflow.com/questions/19481964/calling-taskcompletionsource-setresult-in-a-non-blocking-manner // http://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont // and more, and more, and more // and remember: async is hard private readonly string _logPrefix; private readonly BackgroundTaskRunnerOptions _options; private readonly ILogger> _logger; private readonly IApplicationShutdownRegistry _applicationShutdownRegistry; private readonly object _locker = new object(); private readonly BufferBlock _tasks = new BufferBlock(new DataflowBlockOptions()); // in various places we are testing these vars outside a lock, so make them volatile private volatile bool _isRunning; // is running private volatile bool _completed; // does not accept tasks anymore, may still be running private Task _runningTask; // the threading task that is currently executing background tasks private CancellationTokenSource _shutdownTokenSource; // used to cancel everything and shutdown private CancellationTokenSource _cancelTokenSource; // used to cancel the current task private CancellationToken _shutdownToken; private bool _terminating; // ensures we raise that event only once private bool _terminated; // remember we've terminated private readonly TaskCompletionSource _terminatedSource = new TaskCompletionSource(); // enable awaiting termination /// /// Initializes a new instance of the class. /// /// A logger. /// The application shutdown registry /// An optional main domain hook. public BackgroundTaskRunner(ILogger> logger, IApplicationShutdownRegistry applicationShutdownRegistry, MainDomHook hook = null) : this(typeof(T).FullName, new BackgroundTaskRunnerOptions(), logger, applicationShutdownRegistry, hook) { } /// /// Initializes a new instance of the class. /// /// The name of the runner. /// A logger. /// The application shutdown registry /// An optional main domain hook. public BackgroundTaskRunner(string name, ILogger> logger, IApplicationShutdownRegistry applicationShutdownRegistry, MainDomHook hook = null) : this(name, new BackgroundTaskRunnerOptions(), logger, applicationShutdownRegistry, hook) { } /// /// Initializes a new instance of the class with a set of options. /// /// The set of options. /// A logger. /// The application shutdown registry /// An optional main domain hook. public BackgroundTaskRunner(BackgroundTaskRunnerOptions options, ILogger> logger, IApplicationShutdownRegistry applicationShutdownRegistry, MainDomHook hook = null) : this(typeof(T).FullName, options, logger, applicationShutdownRegistry, hook) { } /// /// Initializes a new instance of the class with a set of options. /// /// The name of the runner. /// The set of options. /// A logger. /// The application shutdown registry /// An optional main domain hook. public BackgroundTaskRunner(string name, BackgroundTaskRunnerOptions options, ILogger> logger, IApplicationShutdownRegistry applicationShutdownRegistry, MainDomHook hook = null) { _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _applicationShutdownRegistry = applicationShutdownRegistry; _logPrefix = "[" + name + "] "; if (options.Hosted) _applicationShutdownRegistry.RegisterObject(this); if (hook != null) _completed = _terminated = hook.Register() == false; if (options.AutoStart && _terminated == false) StartUp(); } /// /// Gets the number of tasks in the queue. /// public int TaskCount => _tasks.Count; /// /// Gets a value indicating whether a threading task is currently running. /// public bool IsRunning => _isRunning; /// /// Gets a value indicating whether the runner has completed and cannot accept tasks anymore. /// public bool IsCompleted => _completed; /// /// Gets the running threading task as an immutable awaitable. /// /// There is no running task. /// /// Unless the AutoStart option is true, there will be no current threading task until /// a background task is added to the queue, and there will be no current threading task /// when the queue is empty. In which case this method returns null. /// The returned value can be awaited and that is all (eg no continuation). /// internal ThreadingTaskImmutable CurrentThreadingTask { get { lock (_locker) { return _runningTask == null ? null : new ThreadingTaskImmutable(_runningTask); } } } /// /// Gets an awaitable used to await the runner running operation. /// /// An awaitable instance. /// Used to wait until the runner is no longer running (IsRunning == false), /// though the runner could be started again afterwards by adding tasks to it. If /// the runner is not running, returns a completed awaitable. public ThreadingTaskImmutable StoppedAwaitable { get { lock (_locker) { var task = _runningTask ?? Task.CompletedTask; return new ThreadingTaskImmutable(task); } } } /// /// Gets an awaitable object that can be used to await for the runner to terminate. /// /// An awaitable object. /// /// Used to wait until the runner has terminated. /// /// The only time the runner will be terminated is by the Hosting Environment when the application is being shutdown. /// /// internal ThreadingTaskImmutable TerminatedAwaitable { get { lock (_locker) { return new ThreadingTaskImmutable(_terminatedSource.Task); } } } /// /// Adds a task to the queue. /// /// The task to add. /// The task runner has completed. public void Add(T task) { lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); // add task _logger.LogDebug("{LogPrefix} Task Added {TaskType}", _logPrefix , task.GetType().FullName); _tasks.Post(task); // start StartUpLocked(); } } /// /// Tries to add a task to the queue. /// /// The task to add. /// true if the task could be added to the queue; otherwise false. /// Returns false if the runner is completed. public bool TryAdd(T task) { lock (_locker) { if (_completed) { _logger.LogDebug("{LogPrefix} Task cannot be added {TaskType}, the task runner has already shutdown", _logPrefix, task.GetType().FullName); return false; } // add task _logger.LogDebug("{LogPrefix} Task added {TaskType}", _logPrefix, task.GetType().FullName); _tasks.Post(task); // start StartUpLocked(); return true; } } /// /// Cancels to current task, if any. /// /// Has no effect if the task runs synchronously, or does not want to cancel. public void CancelCurrentBackgroundTask() { lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); _cancelTokenSource?.Cancel(); } } /// /// Starts the tasks runner, if not already running. /// /// Is invoked each time a task is added, to ensure it is going to be processed. /// The task runner has completed. internal void StartUp() { if (_isRunning) return; lock (_locker) { if (_completed) throw new InvalidOperationException("The task runner has completed."); StartUpLocked(); } } /// /// Starts the tasks runner, if not already running. /// /// Must be invoked within lock(_locker) and with _isCompleted being false. private void StartUpLocked() { // double check if (_isRunning) return; _isRunning = true; // create a new token source since this is a new process _shutdownTokenSource = new CancellationTokenSource(); _shutdownToken = _shutdownTokenSource.Token; using (ExecutionContext.SuppressFlow()) // Do not flow AsyncLocal to the child thread { _runningTask = Task.Run(async () => await Pump().ConfigureAwait(false), _shutdownToken); } _logger.LogDebug("{LogPrefix} Starting", _logPrefix); } /// /// Shuts the tasks runner down. /// /// True for force the runner to stop. /// True to wait until the runner has stopped. /// If is false, no more tasks can be queued but all queued tasks /// will run. If it is true, then only the current one (if any) will end and no other task will run. public void Shutdown(bool force, bool wait) { lock (_locker) { _completed = true; // do not accept new tasks if (_isRunning == false) return; // done already } var hasTasks = TaskCount > 0; if (!force && hasTasks) { _logger.LogInformation("{LogPrefix} Waiting for tasks to complete", _logPrefix); } // complete the queue // will stop waiting on the queue or on a latch _tasks.Complete(); if (force) { // we must bring everything down, now lock (_locker) { // was Complete() enough? // if _tasks.Complete() ended up triggering code to stop the runner and reset // the _isRunning flag, then there's no need to initiate a cancel on the cancelation token. if (_isRunning == false) return; } // try to cancel running async tasks (cannot do much about sync tasks) // break latched tasks // stop processing the queue _shutdownTokenSource?.Cancel(false); // false is the default _shutdownTokenSource?.Dispose(); _shutdownTokenSource = null; } // tasks in the queue will be executed... if (!wait) return; _runningTask?.Wait(CancellationToken.None); // wait for whatever is running to end... } private async Task Pump() { while (true) { // get the next task // if it returns null the runner is going down, stop var bgTask = await GetNextBackgroundTask(_shutdownToken); if (bgTask == null) return; // set a cancellation source so that the current task can be cancelled // link from _shutdownToken so that we can use _cancelTokenSource for both lock (_locker) { _cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_shutdownToken); } try { // wait for latch should return the task // if it returns null it's either that the task has been cancelled // or the whole runner is going down - in both cases, continue, // and GetNextBackgroundTask will take care of shutdowns bgTask = await WaitForLatch(bgTask, _cancelTokenSource.Token); if (bgTask != null) { // executes & be safe - RunAsync should NOT throw but only raise an event, // but... just make sure we never ever take everything down try { await RunAsync(bgTask, _cancelTokenSource.Token).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "{LogPrefix} Task runner exception", _logPrefix); } } } finally { // done lock (_locker) { // always dispose CancellationTokenSource when you are done using them // https://lowleveldesign.org/2015/11/30/catch-in-cancellationtokensource/ _cancelTokenSource.Dispose(); _cancelTokenSource = null; } } } } // gets the next background task from the buffer private async Task GetNextBackgroundTask(CancellationToken token) { while (true) { var task = await GetNextBackgroundTask2(token); if (task != null) return task; lock (_locker) { // deal with race condition if (_shutdownToken.IsCancellationRequested == false && TaskCount > 0) continue; // if we really have nothing to do, stop _logger.LogDebug("{LogPrefix} Stopping", _logPrefix); if (_options.PreserveRunningTask == false) _runningTask = null; _isRunning = false; _shutdownToken = CancellationToken.None; } OnEvent(Stopped, "Stopped"); return null; } } private async Task GetNextBackgroundTask2(CancellationToken shutdownToken) { // exit if canceling if (shutdownToken.IsCancellationRequested) return null; // if KeepAlive is false then don't block, exit if there is // no task in the buffer - yes, there is a race condition, which // we'll take care of if (_options.KeepAlive == false && TaskCount == 0) return null; try { // A Task that informs of whether and when more output is available. If, when the // task completes, its Result is true, more output is available in the source (though another // consumer of the source may retrieve the data). If it returns false, more output is not // and will never be available, due to the source completing prior to output being available. var output = await _tasks.OutputAvailableAsync(shutdownToken); // block until output or cancelled if (output == false) return null; } catch (TaskCanceledException) { return null; } try { // A task that represents the asynchronous receive operation. When an item value is successfully // received from the source, the returned task is completed and its Result returns the received // value. If an item value cannot be retrieved because the source is empty and completed, an // InvalidOperationException exception is thrown in the returned task. // the source cannot be empty *and* completed here - we know we have output return await _tasks.ReceiveAsync(shutdownToken); } catch (TaskCanceledException) { return null; } } // if bgTask is not a latched background task, or if it is not latched, returns immediately // else waits for the latch, taking care of completion and shutdown and whatnot private async Task WaitForLatch(T bgTask, CancellationToken token) { var latched = bgTask as ILatchedBackgroundTask; if (latched == null || latched.IsLatched == false) return bgTask; // support canceling awaiting // read https://github.com/dotnet/corefx/issues/2704 // read http://stackoverflow.com/questions/27238232/how-can-i-cancel-task-whenall var tokenTaskSource = new TaskCompletionSource(); token.Register(s => ((TaskCompletionSource)s).SetResult(true), tokenTaskSource); // returns the task that completed // - latched.Latch completes when the latch releases // - _tasks.Completion completes when the runner completes // - tokenTaskSource.Task completes when this task, or the whole runner is cancelled var task = await Task.WhenAny(latched.Latch, _tasks.Completion, tokenTaskSource.Task); // ok to run now if (task == latched.Latch) return bgTask; // we are shutting down if the _tasks.Complete(); was called or the shutdown token was cancelled var isShuttingDown = _shutdownToken.IsCancellationRequested || task == _tasks.Completion; // if shutting down, return the task only if it runs on shutdown if (isShuttingDown && latched.RunsOnShutdown) return bgTask; // else, either it does not run on shutdown or it's been cancelled, dispose latched.Dispose(); return null; } // runs the background task, taking care of shutdown (as far as possible - cannot abort // a non-async Run for example, so we'll do our best) private async Task RunAsync(T bgTask, CancellationToken token) { try { OnTaskStarting(new TaskEventArgs(bgTask)); try { try { if (bgTask.IsAsync) { // configure await = false since we don't care about the context, we're on a background thread. await bgTask.RunAsync(token).ConfigureAwait(false); } else { bgTask.Run(); } } finally // ensure we disposed - unless latched again ie wants to re-run { if (!(bgTask is ILatchedBackgroundTask lbgTask) || lbgTask.IsLatched == false) { bgTask.Dispose(); } } } catch (Exception e) { OnTaskError(new TaskEventArgs(bgTask, e)); throw; } OnTaskCompleted(new TaskEventArgs(bgTask)); } catch (Exception ex) { _logger.LogError(ex, "{LogPrefix} Task has failed", _logPrefix); } } #region Events // triggers when a background task starts public event TypedEventHandler, TaskEventArgs> TaskStarting; // triggers when a background task has completed public event TypedEventHandler, TaskEventArgs> TaskCompleted; // triggers when a background task throws public event TypedEventHandler, TaskEventArgs> TaskError; // triggers when a background task is cancelled public event TypedEventHandler, TaskEventArgs> TaskCancelled; // triggers when the runner stops (but could start again if a task is added to it) internal event TypedEventHandler, EventArgs> Stopped; // triggers when the hosting environment requests that the runner terminates internal event TypedEventHandler, EventArgs> Terminating; // triggers when the hosting environment has terminated (no task can be added, no task is running) internal event TypedEventHandler, EventArgs> Terminated; private void OnEvent(TypedEventHandler, EventArgs> handler, string name) { OnEvent(handler, name, EventArgs.Empty); } private void OnEvent(TypedEventHandler, TArgs> handler, string name, TArgs e) { _logger.LogDebug("{LogPrefix} OnEvent {EventName}", _logPrefix, name); if (handler == null) return; try { handler(this, e); } catch (Exception ex) { _logger.LogError(ex, "{LogPrefix} {Name} exception occurred", _logPrefix, name); } } protected virtual void OnTaskError(TaskEventArgs e) { OnEvent(TaskError, "TaskError", e); } protected virtual void OnTaskStarting(TaskEventArgs e) { OnEvent(TaskStarting, "TaskStarting", e); } protected virtual void OnTaskCompleted(TaskEventArgs e) { OnEvent(TaskCompleted, "TaskCompleted", e); } protected virtual void OnTaskCancelled(TaskEventArgs e) { OnEvent(TaskCancelled, "TaskCancelled", e); // dispose it e.Task.Dispose(); } #endregion #region IDisposable private readonly object _disposalLocker = new object(); public bool IsDisposed { get; private set; } ~BackgroundTaskRunner() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (IsDisposed || disposing == false) return; lock (_disposalLocker) { if (IsDisposed) return; DisposeResources(); IsDisposed = true; } } protected virtual void DisposeResources() { // just make sure we eventually go down Shutdown(true, false); } #endregion #region IRegisteredObject.Stop /// /// Used by IRegisteredObject.Stop and shutdown on threadpool threads to not block shutdown times. /// /// /// /// An awaitable Task that is used to handle the shutdown. /// internal Task StopInternal(bool immediate) { // the first time the hosting environment requests that the runner terminates, // raise the Terminating event - that could be used to prevent any process that // would expect the runner to be available from starting. var onTerminating = false; lock (_locker) { if (_terminating == false) { _terminating = true; _logger.LogInformation("{LogPrefix} Terminating {Immediate}", _logPrefix, immediate ? immediate.ToString() : string.Empty); onTerminating = true; } } if (onTerminating) OnEvent(Terminating, "Terminating"); // Run the Stop commands on another thread since IRegisteredObject.Stop calls are called sequentially // with a single aspnet thread during shutdown and we don't want to delay other calls to IRegisteredObject.Stop. if (!immediate) { using (ExecutionContext.SuppressFlow()) { return Task.Run(StopInitial, CancellationToken.None); } } else { lock (_locker) { if (_terminated) return Task.CompletedTask; using (ExecutionContext.SuppressFlow()) { return Task.Run(StopImmediate, CancellationToken.None); } } } } /// /// Requests a registered object to un-register. /// /// true to indicate the registered object should un-register from the hosting /// environment before returning; otherwise, false. /// /// "When the application manager needs to stop a registered object, it will call the Stop method." /// The application manager will call the Stop method to ask a registered object to un-register. During /// processing of the Stop method, the registered object must call the applicationShutdownRegistry.UnregisterObject method. /// public void Stop(bool immediate) => StopInternal(immediate); /// /// Called when immediate == false for IRegisteredObject.Stop(bool immediate) /// /// /// Called on a threadpool thread /// private void StopInitial() { // immediate == false when the app is trying to wind down, immediate == true will be called either: // after a call with immediate == false or if the app is not trying to wind down and needs to immediately stop. // So Stop may be called twice or sometimes only once. try { Shutdown(false, false); // do not accept any more tasks, flush the queue, do not wait } finally { // raise the completed event only after the running threading task has completed lock (_locker) { if (_runningTask != null) { _runningTask.ContinueWith( _ => StopImmediate(), // Must explicitly specify this, see https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html TaskScheduler.Default); } else { StopImmediate(); } } } // If the shutdown token was not canceled in the Shutdown call above, it means there was still tasks // being processed, in which case we'll give it a couple seconds if (!_shutdownToken.IsCancellationRequested) { // If we are called with immediate == false, wind down above and then shutdown within 2 seconds, // we want to shut down the app as quick as possible, if we wait until immediate == true, this can // take a very long time since immediate will only be true when a new request is received on the new // appdomain (or another iis timeout occurs ... which can take some time). Thread.Sleep(2000); //we are already on a threadpool thread StopImmediate(); } } /// /// Called when immediate == true for IRegisteredObject.Stop(bool immediate) /// /// /// Called on a threadpool thread /// private void StopImmediate() { _logger.LogInformation("{LogPrefix} Canceling tasks", _logPrefix); try { Shutdown(true, true); // cancel all tasks, wait for the current one to end } finally { Terminate(true); } } // called by Stop either immediately or eventually private void Terminate(bool immediate) { // signal the environment we have terminated // log // raise the Terminated event // complete the awaitable completion source, if any if (immediate) { //only unregister when it's the final call, else we won't be notified of the final call _applicationShutdownRegistry.UnregisterObject(this); } if (_terminated) return; // already taken care of TaskCompletionSource terminatedSource; lock (_locker) { _terminated = true; terminatedSource = _terminatedSource; } _logger.LogInformation("{LogPrefix} Tasks {TaskStatus}, terminated", _logPrefix, immediate ? "cancelled" : "completed"); OnEvent(Terminated, "Terminated"); terminatedSource.TrySetResult(0); } #endregion } }