Load Balancing: Implement distributed background jobs (#20397)

* Start work

* Introduce dto

* Start making repository

* Add migrations

* Implement fetchable first job

* Fix up to also finish tasks

* Refactor jobs to distributed background jobs

* Filter jobs correctly on LastRun

* Hardcode delay

* Add settings to configure delay and period

* Fix formatting

* Add default data

* Add update on startup, which will update periods on startup

* Refactor service to return job directly

* Update src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/Umbraco.Infrastructure/Migrations/Install/DatabaseDataCreator.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/Umbraco.Infrastructure/Migrations/Install/DatabaseDataCreator.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Remove unused

* Move jobs and make internal

* make OpenIddictCleanupJob.cs public, as it is used elsewhere

* Minor docstring changes

* Update src/Umbraco.Core/Persistence/Constants-Locks.cs

Co-authored-by: Mole <nikolajlauridsen@protonmail.ch>

* ´Throw correct exceptions

* Update xml doc

* Remove business logic from repository

* Remove more business logic from repository into service

* Remove adding jobs from migration

* fix creation

* Rename to ExecuteAsync

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: mole <nikolajlauridsen@protonmail.ch>
This commit is contained in:
Nikolaj Geisle
2025-10-07 18:49:21 +02:00
committed by GitHub
parent f0cf4703fa
commit 20de48a496
39 changed files with 776 additions and 177 deletions

View File

@@ -0,0 +1,55 @@
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Persistence.Repositories;
using Umbraco.Cms.Core.Scoping;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// A background job that prunes cache instructions from the database.
/// </summary>
internal class CacheInstructionsPruningJob : IDistributedBackgroundJob
{
private readonly IOptions<GlobalSettings> _globalSettings;
private readonly ICacheInstructionRepository _cacheInstructionRepository;
private readonly ICoreScopeProvider _scopeProvider;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new instance of the <see cref="CacheInstructionsPruningJob"/> class.
/// </summary>
/// <param name="scopeProvider">Provides scopes for database operations.</param>
/// <param name="globalSettings">The global settings configuration.</param>
/// <param name="cacheInstructionRepository">The repository for cache instructions.</param>
/// <param name="timeProvider">The time provider.</param>
public CacheInstructionsPruningJob(
IOptions<GlobalSettings> globalSettings,
ICacheInstructionRepository cacheInstructionRepository,
ICoreScopeProvider scopeProvider,
TimeProvider timeProvider)
{
_globalSettings = globalSettings;
_cacheInstructionRepository = cacheInstructionRepository;
_scopeProvider = scopeProvider;
_timeProvider = timeProvider;
Period = globalSettings.Value.DatabaseServerMessenger.TimeBetweenPruneOperations;
}
public string Name => "CacheInstructionsPruningJob";
/// <inheritdoc />
public TimeSpan Period { get; }
/// <inheritdoc />
public Task ExecuteAsync()
{
DateTimeOffset pruneDate = _timeProvider.GetUtcNow() - _globalSettings.Value.DatabaseServerMessenger.TimeToRetainInstructions;
using (ICoreScope scope = _scopeProvider.CreateCoreScope())
{
_cacheInstructionRepository.DeleteInstructionsOlderThan(pruneDate.DateTime);
scope.Complete();
}
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,63 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Services;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Recurring hosted service that executes the content history cleanup.
/// </summary>
internal class ContentVersionCleanupJob : IDistributedBackgroundJob
{
/// <inheritdoc />
public string Name => "ContentVersionCleanupJob";
/// <inheritdoc />
public TimeSpan Period { get => TimeSpan.FromHours(1); }
private readonly ILogger<ContentVersionCleanupJob> _logger;
private readonly IContentVersionService _service;
private readonly IOptionsMonitor<ContentSettings> _settingsMonitor;
/// <summary>
/// Initializes a new instance of the <see cref="ContentVersionCleanupJob" /> class.
/// </summary>
public ContentVersionCleanupJob(
ILogger<ContentVersionCleanupJob> logger,
IOptionsMonitor<ContentSettings> settingsMonitor,
IContentVersionService service)
{
_logger = logger;
_settingsMonitor = settingsMonitor;
_service = service;
}
/// <inheritdoc />
public Task ExecuteAsync()
{
// Globally disabled by feature flag
if (!_settingsMonitor.CurrentValue.ContentVersionCleanupPolicy.EnableCleanup)
{
_logger.LogInformation(
"ContentVersionCleanup task will not run as it has been globally disabled via configuration");
return Task.CompletedTask;
}
var count = _service.PerformContentVersionCleanup(DateTime.UtcNow).Count;
if (count > 0)
{
_logger.LogInformation("Deleted {count} ContentVersion(s)", count);
}
else
{
_logger.LogDebug("Task complete, no items were Deleted");
}
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,106 @@
// Copyright (c) Umbraco.
// See LICENSE for more details.
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Events;
using Umbraco.Cms.Core.HealthChecks;
using Umbraco.Cms.Core.HealthChecks.NotificationMethods;
using Umbraco.Cms.Core.Logging;
using Umbraco.Cms.Core.Notifications;
using Umbraco.Cms.Core.Scoping;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Hosted service implementation for recurring health check notifications.
/// </summary>
internal class HealthCheckNotifierJob : IDistributedBackgroundJob
{
/// <inheritdoc />
public string Name => "HealthCheckNotifierJob";
/// <inheritdoc/>
public TimeSpan Period { get; private set; }
private readonly HealthCheckCollection _healthChecks;
private readonly HealthCheckNotificationMethodCollection _notifications;
private readonly IProfilingLogger _profilingLogger;
private readonly IEventAggregator _eventAggregator;
private readonly ICoreScopeProvider _scopeProvider;
private HealthChecksSettings _healthChecksSettings;
/// <summary>
/// Initializes a new instance of the <see cref="HealthCheckNotifierJob" /> class.
/// </summary>
/// <param name="healthChecksSettings">The configuration for health check settings.</param>
/// <param name="healthChecks">The collection of healthchecks.</param>
/// <param name="notifications">The collection of healthcheck notification methods.</param>
/// <param name="scopeProvider">Provides scopes for database operations.</param>
/// <param name="logger">The typed logger.</param>
/// <param name="profilingLogger">The profiling logger.</param>
/// <param name="cronTabParser">Parser of crontab expressions.</param>
/// <param name="eventAggregator"></param>
public HealthCheckNotifierJob(
IOptionsMonitor<HealthChecksSettings> healthChecksSettings,
HealthCheckCollection healthChecks,
HealthCheckNotificationMethodCollection notifications,
ICoreScopeProvider scopeProvider,
IProfilingLogger profilingLogger,
IEventAggregator eventAggregator)
{
_healthChecksSettings = healthChecksSettings.CurrentValue;
_healthChecks = healthChecks;
_notifications = notifications;
_scopeProvider = scopeProvider;
_profilingLogger = profilingLogger;
_eventAggregator = eventAggregator;
Period = healthChecksSettings.CurrentValue.Notification.Period;
healthChecksSettings.OnChange(x =>
{
_healthChecksSettings = x;
Period = x.Notification.Period;
});
}
/// <inheritdoc/>
public async Task ExecuteAsync()
{
if (_healthChecksSettings.Notification.Enabled == false)
{
return;
}
// Ensure we use an explicit scope since we are running on a background thread and plugin health
// checks can be making service/database calls so we want to ensure the CallContext/Ambient scope
// isn't used since that can be problematic.
using (ICoreScope scope = _scopeProvider.CreateCoreScope(autoComplete: true))
using (_profilingLogger.DebugDuration<HealthCheckNotifierJob>("Health checks executing", "Health checks complete"))
{
// Don't notify for any checks that are disabled, nor for any disabled just for notifications.
Guid[] disabledCheckIds = _healthChecksSettings.Notification.DisabledChecks
.Select(x => x.Id)
.Union(_healthChecksSettings.DisabledChecks
.Select(x => x.Id))
.Distinct()
.ToArray();
IEnumerable<HealthCheck> checks = _healthChecks
.Where(x => disabledCheckIds.Contains(x.Id) == false);
HealthCheckResults results = await HealthCheckResults.Create(checks);
results.LogResults();
_eventAggregator.Publish(new HealthCheckCompletedNotification(results));
// Send using registered notification methods that are enabled.
foreach (IHealthCheckNotificationMethod notificationMethod in _notifications.Where(x => x.Enabled))
{
await notificationMethod.SendAsync(results);
}
}
}
}

View File

@@ -0,0 +1,63 @@
// Copyright (c) Umbraco.
// See LICENSE for more details.
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Logging;
using Umbraco.Cms.Core.Scoping;
using Umbraco.Cms.Core.Services;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Log scrubbing hosted service.
/// </summary>
/// <remarks>
/// Will only run on non-replica servers.
/// </remarks>
internal class LogScrubberJob : IDistributedBackgroundJob
{
private readonly IAuditService _auditService;
private readonly IProfilingLogger _profilingLogger;
private readonly ICoreScopeProvider _scopeProvider;
private LoggingSettings _settings;
/// <inheritdoc />
public string Name => "LogScrubberJob";
/// <inheritdoc />
public TimeSpan Period => TimeSpan.FromHours(4);
/// <summary>
/// Initializes a new instance of the <see cref="LogScrubberJob" /> class.
/// </summary>
/// <param name="auditService">Service for handling audit operations.</param>
/// <param name="settings">The configuration for logging settings.</param>
/// <param name="scopeProvider">Provides scopes for database operations.</param>
/// <param name="logger">The typed logger.</param>
/// <param name="profilingLogger">The profiling logger.</param>
public LogScrubberJob(
IAuditService auditService,
IOptionsMonitor<LoggingSettings> settings,
ICoreScopeProvider scopeProvider,
IProfilingLogger profilingLogger)
{
_auditService = auditService;
_settings = settings.CurrentValue;
_scopeProvider = scopeProvider;
_profilingLogger = profilingLogger;
settings.OnChange(x => _settings = x);
}
/// <inheritdoc/>
public async Task ExecuteAsync()
{
// Ensure we use an explicit scope since we are running on a background thread.
using ICoreScope scope = _scopeProvider.CreateCoreScope();
using (_profilingLogger.DebugDuration<LogScrubberJob>("Log scrubbing executing", "Log scrubbing complete"))
{
await _auditService.CleanLogsAsync((int)_settings.MaxLogAge.TotalMinutes);
_ = scope.Complete();
}
}
}

View File

@@ -0,0 +1,51 @@
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Persistence.Repositories;
using Umbraco.Cms.Core.Scoping;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Cleans up long-running operations that have exceeded a specified age.
/// </summary>
internal class LongRunningOperationsCleanupJob : IDistributedBackgroundJob
{
private readonly ICoreScopeProvider _scopeProvider;
private readonly ILongRunningOperationRepository _longRunningOperationRepository;
private readonly TimeProvider _timeProvider;
private readonly TimeSpan _maxEntryAge;
/// <summary>
/// Initializes a new instance of the <see cref="LongRunningOperationsCleanupJob"/> class.
/// </summary>
/// <param name="options">The long-running operations settings.</param>
/// <param name="scopeProvider">The scope provider for managing database transactions.</param>
/// <param name="longRunningOperationRepository">The repository for managing long-running operations.</param>
/// <param name="timeProvider">The time provider for getting the current time.</param>
public LongRunningOperationsCleanupJob(
IOptions<LongRunningOperationsSettings> options,
ICoreScopeProvider scopeProvider,
ILongRunningOperationRepository longRunningOperationRepository,
TimeProvider timeProvider)
{
_scopeProvider = scopeProvider;
_longRunningOperationRepository = longRunningOperationRepository;
_timeProvider = timeProvider;
_maxEntryAge = options.Value.Cleanup.MaxEntryAge;
Period = options.Value.Cleanup.Period;
}
/// <inheritdoc />
public string Name => "LongRunningOperationsCleanupJob";
/// <inheritdoc />
public TimeSpan Period { get; }
/// <inheritdoc />
public async Task ExecuteAsync()
{
using ICoreScope scope = _scopeProvider.CreateCoreScope();
await _longRunningOperationRepository.CleanOperationsAsync(_timeProvider.GetUtcNow() - _maxEntryAge);
scope.Complete();
}
}

View File

@@ -0,0 +1,68 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OpenIddict.Abstractions;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Port of the OpenIddict Quartz job for cleaning up - see https://github.com/openiddict/openiddict-core/tree/dev/src/OpenIddict.Quartz
/// </summary>
public class OpenIddictCleanupJob : IDistributedBackgroundJob
{
/// <inheritdoc />
public string Name => "OpenIddictCleanupJob";
/// <inheritdoc />
public TimeSpan Period => TimeSpan.FromHours(1);
// keep tokens and authorizations in the database for 7 days
// - NOTE: this is NOT the same as access token lifetime, which is likely very short
private const int LifespanInSeconds = 7 * 24 * 60 * 60;
private readonly ILogger<OpenIddictCleanupJob> _logger;
private readonly IServiceProvider _provider;
/// <summary>
/// Initializes a new instance of the <see cref="OpenIddictCleanupJob"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="provider"></param>
public OpenIddictCleanupJob(ILogger<OpenIddictCleanupJob> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
/// <inheritdoc />
public async Task ExecuteAsync()
{
// hosted services are registered as singletons, but this particular one consumes scoped services... so
// we have to fetch the service dependencies manually using a new scope per invocation.
IServiceScope scope = _provider.CreateScope();
DateTimeOffset threshold = DateTimeOffset.UtcNow - TimeSpan.FromSeconds(LifespanInSeconds);
try
{
IOpenIddictTokenManager tokenManager = scope.ServiceProvider.GetService<IOpenIddictTokenManager>()
?? throw new InvalidOperationException($"Could not retrieve an {nameof(IOpenIddictTokenManager)} service from the current scope");
await tokenManager.PruneAsync(threshold);
}
catch (Exception exception)
{
_logger.LogError(exception, "Unable to prune OpenIddict tokens");
}
try
{
IOpenIddictAuthorizationManager authorizationManager = scope.ServiceProvider.GetService<IOpenIddictAuthorizationManager>()
?? throw new InvalidOperationException($"Could not retrieve an {nameof(IOpenIddictAuthorizationManager)} service from the current scope");
await authorizationManager.PruneAsync(threshold);
}
catch (Exception exception)
{
_logger.LogError(exception, "Unable to prune OpenIddict authorizations");
}
}
}

View File

@@ -0,0 +1,110 @@
// Copyright (c) Umbraco.
// See LICENSE for more details.
using Microsoft.Extensions.Logging;
using Umbraco.Cms.Core;
using Umbraco.Cms.Core.Scoping;
using Umbraco.Cms.Core.Services;
using Umbraco.Cms.Core.Sync;
using Umbraco.Cms.Core.Web;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Hosted service implementation for scheduled publishing feature.
/// </summary>
/// <remarks>
/// Runs only on non-replica servers.
/// </remarks>
internal class ScheduledPublishingJob : IDistributedBackgroundJob
{
/// <inheritdoc />
public string Name => "ScheduledPublishingJob";
/// <inheritdoc />
public TimeSpan Period => TimeSpan.FromMinutes(1);
private readonly IContentService _contentService;
private readonly ILogger<ScheduledPublishingJob> _logger;
private readonly ICoreScopeProvider _scopeProvider;
private readonly TimeProvider _timeProvider;
private readonly IServerMessenger _serverMessenger;
private readonly IUmbracoContextFactory _umbracoContextFactory;
/// <summary>
/// Initializes a new instance of the <see cref="ScheduledPublishingJob" /> class.
/// </summary>
public ScheduledPublishingJob(
IContentService contentService,
IUmbracoContextFactory umbracoContextFactory,
ILogger<ScheduledPublishingJob> logger,
IServerMessenger serverMessenger,
ICoreScopeProvider scopeProvider,
TimeProvider timeProvider)
{
_contentService = contentService;
_umbracoContextFactory = umbracoContextFactory;
_logger = logger;
_serverMessenger = serverMessenger;
_scopeProvider = scopeProvider;
_timeProvider = timeProvider;
}
/// <inheritdoc />
public Task ExecuteAsync()
{
if (Suspendable.ScheduledPublishing.CanRun == false)
{
return Task.CompletedTask;
}
try
{
// Ensure we run with an UmbracoContext, because this will run in a background task,
// and developers may be using the UmbracoContext in the event handlers.
// TODO: or maybe not, CacheRefresherComponent already ensures a context when handling events
// - UmbracoContext 'current' needs to be refactored and cleaned up
// - batched messenger should not depend on a current HttpContext
// but then what should be its "scope"? could we attach it to scopes?
// - and we should definitively *not* have to flush it here (should be auto)
using UmbracoContextReference contextReference = _umbracoContextFactory.EnsureUmbracoContext();
using ICoreScope scope = _scopeProvider.CreateCoreScope(autoComplete: true);
/* We used to assume that there will never be two instances running concurrently where (IsMainDom && ServerRole == SchedulingPublisher)
* However this is possible during an azure deployment slot swap for the SchedulingPublisher instance when trying to achieve zero downtime deployments.
* If we take a distributed write lock, we are certain that the multiple instances of the job will not run in parallel.
* It's possible that during the swapping process we may run this job more frequently than intended but this is not of great concern and it's
* only until the old SchedulingPublisher shuts down. */
scope.EagerWriteLock(Constants.Locks.ScheduledPublishing);
try
{
// Run
IEnumerable<PublishResult> result = _contentService.PerformScheduledPublish(_timeProvider.GetUtcNow().UtcDateTime);
foreach (IGrouping<PublishResultType, PublishResult> grouped in result.GroupBy(x => x.Result))
{
_logger.LogInformation(
"Scheduled publishing result: '{StatusCount}' items with status {Status}",
grouped.Count(),
grouped.Key);
}
}
finally
{
// If running on a temp context, we have to flush the messenger
if (contextReference.IsRoot)
{
_serverMessenger.SendMessages();
}
}
}
catch (Exception ex)
{
// important to catch *everything* to ensure the task repeats
_logger.LogError(ex, "Failed.");
}
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,47 @@
using Microsoft.Extensions.Logging;
using Umbraco.Cms.Core.Services;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Cleans up temporary media files.
/// </summary>
internal class TemporaryFileCleanupJob : IDistributedBackgroundJob
{
/// <inheritdoc />
public string Name => "TemporaryFileCleanupJob";
/// <inheritdoc />
public TimeSpan Period => TimeSpan.FromMinutes(5);
private readonly ILogger<TemporaryFileCleanupJob> _logger;
private readonly ITemporaryFileService _service;
/// <summary>
/// Initializes a new instance of the <see cref="TemporaryFileCleanupJob" /> class.
/// </summary>
public TemporaryFileCleanupJob(
ILogger<TemporaryFileCleanupJob> logger,
ITemporaryFileService temporaryFileService)
{
_logger = logger;
_service = temporaryFileService;
}
/// <inheritdoc />
public async Task ExecuteAsync()
{
var count = (await _service.CleanUpOldTempFiles()).Count();
if (count > 0)
{
_logger.LogDebug("Deleted {Count} temporary file(s)", count);
}
else
{
_logger.LogDebug("Task complete, no items were deleted");
}
}
}

View File

@@ -0,0 +1,150 @@
using System.Net.Mime;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Models;
using Umbraco.Cms.Core.Scoping;
using Umbraco.Cms.Core.Services;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Fires pending webhooks.
/// </summary>
internal class WebhookFiring : IDistributedBackgroundJob
{
private readonly ILogger<WebhookFiring> _logger;
private readonly IWebhookRequestService _webhookRequestService;
private readonly IWebhookLogFactory _webhookLogFactory;
private readonly IWebhookLogService _webhookLogService;
private readonly IWebhookService _webHookService;
private readonly ICoreScopeProvider _coreScopeProvider;
private readonly IHttpClientFactory _httpClientFactory;
private WebhookSettings _webhookSettings;
/// <inheritdoc />
public string Name => "WebhookFiring";
/// <inheritdoc />
public TimeSpan Period => _webhookSettings.Period;
/// <summary>
/// Initializes a new instance of the <see cref="WebhookFiring"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="webhookRequestService"></param>
/// <param name="webhookLogFactory"></param>
/// <param name="webhookLogService"></param>
/// <param name="webHookService"></param>
/// <param name="webhookSettings"></param>
/// <param name="coreScopeProvider"></param>
/// <param name="httpClientFactory"></param>
public WebhookFiring(
ILogger<WebhookFiring> logger,
IWebhookRequestService webhookRequestService,
IWebhookLogFactory webhookLogFactory,
IWebhookLogService webhookLogService,
IWebhookService webHookService,
IOptionsMonitor<WebhookSettings> webhookSettings,
ICoreScopeProvider coreScopeProvider,
IHttpClientFactory httpClientFactory)
{
_logger = logger;
_webhookRequestService = webhookRequestService;
_webhookLogFactory = webhookLogFactory;
_webhookLogService = webhookLogService;
_webHookService = webHookService;
_coreScopeProvider = coreScopeProvider;
_httpClientFactory = httpClientFactory;
_webhookSettings = webhookSettings.CurrentValue;
webhookSettings.OnChange(x => _webhookSettings = x);
}
/// <inheritdoc />
public async Task ExecuteAsync()
{
if (_webhookSettings.Enabled is false)
{
_logger.LogInformation("WebhookFiring task will not run as it has been globally disabled via configuration");
return;
}
IEnumerable<WebhookRequest> requests;
using (ICoreScope scope = _coreScopeProvider.CreateCoreScope())
{
scope.ReadLock(Constants.Locks.WebhookRequest);
requests = await _webhookRequestService.GetAllAsync();
scope.Complete();
}
// Send webhook requests in parallel on a suppressed ExecutionContext to avoid deadlocks (each task will create its own root IScope)
await Task.WhenAll(requests.Select(request =>
{
using (ExecutionContext.SuppressFlow())
{
return Task.Run(async () =>
{
IWebhook? webhook = await _webHookService.GetAsync(request.WebhookKey);
if (webhook is null)
{
return;
}
using HttpResponseMessage? response = await SendRequestAsync(webhook, request.EventAlias, request.RequestObject, request.RetryCount, CancellationToken.None);
if ((response?.IsSuccessStatusCode ?? false) || request.RetryCount >= _webhookSettings.MaximumRetries)
{
await _webhookRequestService.DeleteAsync(request);
}
else
{
request.RetryCount++;
await _webhookRequestService.UpdateAsync(request);
}
});
}
}));
}
private async Task<HttpResponseMessage?> SendRequestAsync(IWebhook webhook, string eventName, string? serializedObject, int retryCount, CancellationToken cancellationToken)
{
using HttpClient httpClient = _httpClientFactory.CreateClient(Constants.HttpClients.WebhookFiring);
using var request = new HttpRequestMessage(HttpMethod.Post, webhook.Url)
{
Version = httpClient.DefaultRequestVersion,
VersionPolicy = httpClient.DefaultVersionPolicy,
};
HttpResponseMessage? response = null;
Exception? exception = null;
try
{
// Add headers
request.Headers.Add(Constants.WebhookEvents.HeaderNames.EventName, eventName);
request.Headers.Add(Constants.WebhookEvents.HeaderNames.RetryCount, retryCount.ToString());
foreach (KeyValuePair<string, string> header in webhook.Headers)
{
request.Headers.Add(header.Key, header.Value);
}
// Set content
request.Content = new StringContent(serializedObject ?? string.Empty, Encoding.UTF8, MediaTypeNames.Application.Json);
// Send request
response = await httpClient.SendAsync(request, cancellationToken);
}
catch (Exception ex)
{
exception = ex;
_logger.LogError(ex, "Error while sending webhook request for webhook {WebhookKey}.", webhook.Key);
}
WebhookLog log = await _webhookLogFactory.CreateAsync(eventName, request, response, retryCount, exception, webhook, cancellationToken);
await _webhookLogService.CreateAsync(log);
return response;
}
}

View File

@@ -0,0 +1,69 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.Models;
using Umbraco.Cms.Core.Persistence.Repositories;
using Umbraco.Cms.Core.Scoping;
using Umbraco.Extensions;
namespace Umbraco.Cms.Infrastructure.BackgroundJobs.Jobs.DistributedJobs;
/// <summary>
/// Daily background job that removes all webhook log data older than x days as defined by <see cref="WebhookSettings.KeepLogsForDays"/>
/// </summary>
internal class WebhookLoggingCleanup : IDistributedBackgroundJob
{
private readonly ILogger<WebhookLoggingCleanup> _logger;
private readonly WebhookSettings _webhookSettings;
private readonly IWebhookLogRepository _webhookLogRepository;
private readonly ICoreScopeProvider _coreScopeProvider;
/// <summary>
/// Initializes a new instance of the <see cref="WebhookLoggingCleanup"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="webhookSettings"></param>
/// <param name="webhookLogRepository"></param>
/// <param name="coreScopeProvider"></param>
public WebhookLoggingCleanup(ILogger<WebhookLoggingCleanup> logger, IOptionsMonitor<WebhookSettings> webhookSettings, IWebhookLogRepository webhookLogRepository, ICoreScopeProvider coreScopeProvider)
{
_logger = logger;
_webhookSettings = webhookSettings.CurrentValue;
_webhookLogRepository = webhookLogRepository;
_coreScopeProvider = coreScopeProvider;
}
/// <inheritdoc />
public string Name => "WebhookLoggingCleanup";
/// <inheritdoc />
public TimeSpan Period => TimeSpan.FromDays(1);
/// <inheritdoc />
public async Task ExecuteAsync()
{
if (_webhookSettings.EnableLoggingCleanup is false)
{
_logger.LogInformation("WebhookLoggingCleanup task will not run as it has been globally disabled via configuration");
return;
}
IEnumerable<WebhookLog> webhookLogs;
using (ICoreScope scope = _coreScopeProvider.CreateCoreScope())
{
scope.ReadLock(Constants.Locks.WebhookLogs);
webhookLogs = await _webhookLogRepository.GetOlderThanDate(DateTime.UtcNow - TimeSpan.FromDays(_webhookSettings.KeepLogsForDays));
scope.Complete();
}
foreach (IEnumerable<WebhookLog> group in webhookLogs.InGroupsOf(Constants.Sql.MaxParameterCount))
{
using ICoreScope scope = _coreScopeProvider.CreateCoreScope();
scope.WriteLock(Constants.Locks.WebhookLogs);
await _webhookLogRepository.DeleteByIds(group.Select(x => x.Id).ToArray());
scope.Complete();
}
}
}