using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; using System.Threading; using System.Web; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Umbraco.Core.Cache; using Umbraco.Core.IO; using Umbraco.Core.Logging; using Umbraco.Core.Models.Rdbms; using Umbraco.Core.Persistence; using umbraco.interfaces; using Umbraco.Core.Persistence.SqlSyntax; namespace Umbraco.Core.Sync { /// /// An that works by storing messages in the database. /// // // this messenger writes ALL instructions to the database, // but only processes instructions coming from remote servers, // thus ensuring that instructions run only once // public class DatabaseServerMessenger : ServerMessengerBase { private readonly ApplicationContext _appContext; private readonly DatabaseServerMessengerOptions _options; private readonly ManualResetEvent _syncIdle; private readonly object _locko = new object(); private readonly ILogger _logger; private int _lastId = -1; private DateTime _lastSync; private bool _initialized; private bool _syncing; private bool _released; private readonly ProfilingLogger _profilingLogger; protected ApplicationContext ApplicationContext { get { return _appContext; } } public DatabaseServerMessenger(ApplicationContext appContext, bool distributedEnabled, DatabaseServerMessengerOptions options) : base(distributedEnabled) { if (appContext == null) throw new ArgumentNullException("appContext"); if (options == null) throw new ArgumentNullException("options"); _appContext = appContext; _options = options; _lastSync = DateTime.UtcNow; _syncIdle = new ManualResetEvent(true); _profilingLogger = appContext.ProfilingLogger; _logger = appContext.ProfilingLogger.Logger; } #region Messenger protected override bool RequiresDistributed(IEnumerable servers, ICacheRefresher refresher, MessageType dispatchType) { // we don't care if there's servers listed or not, // if distributed call is enabled we will make the call return _initialized && DistributedEnabled; } protected override void DeliverRemote( IEnumerable servers, ICacheRefresher refresher, MessageType messageType, IEnumerable ids = null, string json = null) { var idsA = ids == null ? null : ids.ToArray(); Type idType; if (GetArrayType(idsA, out idType) == false) throw new ArgumentException("All items must be of the same type, either int or Guid.", "ids"); var instructions = RefreshInstruction.GetInstructions(refresher, messageType, idsA, idType, json); var dto = new CacheInstructionDto { UtcStamp = DateTime.UtcNow, Instructions = JsonConvert.SerializeObject(instructions, Formatting.None), OriginIdentity = LocalIdentity }; ApplicationContext.DatabaseContext.Database.Insert(dto); } #endregion #region Sync /// /// Boots the messenger. /// /// /// Thread safety: this is NOT thread safe. Because it is NOT meant to run multi-threaded. /// Callers MUST ensure thread-safety. /// protected void Boot() { // weight:10, must release *before* the facade service, because once released // the service will *not* be able to properly handle our notifications anymore const int weight = 10; var registered = ApplicationContext.MainDom.Register( () => { lock (_locko) { _released = true; // no more syncs } _syncIdle.WaitOne(); // wait for pending sync }, weight); if (registered == false) return; ReadLastSynced(); // get _lastId EnsureInstructions(); // reset _lastId if instrs are missing Initialize(); // boot } /// /// Initializes a server that has never synchronized before. /// /// /// Thread safety: this is NOT thread safe. Because it is NOT meant to run multi-threaded. /// Callers MUST ensure thread-safety. /// private void Initialize() { lock (_locko) { if (_released) return; var coldboot = false; if (_lastId < 0) // never synced before { // we haven't synced - in this case we aren't going to sync the whole thing, we will assume this is a new // server and it will need to rebuild it's own caches, eg Lucene or the xml cache file. _logger.Warn("No last synced Id found, this generally means this is a new server/install." + " The server will build its caches and indexes, and then adjust its last synced Id to the latest found in" + " the database and maintain cache updates based on that Id."); coldboot = true; } else { //check for how many instructions there are to process var count = _appContext.DatabaseContext.Database.ExecuteScalar("SELECT COUNT(*) FROM umbracoCacheInstruction WHERE id > @lastId", new {lastId = _lastId}); if (count > _options.MaxProcessingInstructionCount) { //too many instructions, proceed to cold boot _logger.Warn("The instruction count ({0}) exceeds the specified MaxProcessingInstructionCount ({1})." + " The server will skip existing instructions, rebuild its caches and indexes entirely, adjust its last synced Id" + " to the latest found in the database and maintain cache updates based on that Id.", () => count, () => _options.MaxProcessingInstructionCount); coldboot = true; } } if (coldboot) { // go get the last id in the db and store it // note: do it BEFORE initializing otherwise some instructions might get lost // when doing it before, some instructions might run twice - not an issue var lastId = _appContext.DatabaseContext.Database.ExecuteScalar("SELECT MAX(id) FROM umbracoCacheInstruction"); if (lastId > 0) SaveLastSynced(lastId); // execute initializing callbacks if (_options.InitializingCallbacks != null) foreach (var callback in _options.InitializingCallbacks) callback(); } _initialized = true; } } /// /// Synchronize the server (throttled). /// protected void Sync() { lock (_locko) { if (_syncing) return; if (_released) return; if ((DateTime.UtcNow - _lastSync).TotalSeconds <= _options.ThrottleSeconds) return; _syncing = true; _syncIdle.Reset(); _lastSync = DateTime.UtcNow; } try { using (_profilingLogger.DebugDuration("Syncing from database...")) { ProcessDatabaseInstructions(); switch (_appContext.GetCurrentServerRole()) { case ServerRole.Single: case ServerRole.Master: PruneOldInstructions(); break; } } } finally { _syncing = false; _syncIdle.Set(); } } /// /// Process instructions from the database. /// /// /// Thread safety: this is NOT thread safe. Because it is NOT meant to run multi-threaded. /// private void ProcessDatabaseInstructions() { // NOTE // we 'could' recurse to ensure that no remaining instructions are pending in the table before proceeding but I don't think that // would be a good idea since instructions could keep getting added and then all other threads will probably get stuck from serving requests // (depending on what the cache refreshers are doing). I think it's best we do the one time check, process them and continue, if there are // pending requests after being processed, they'll just be processed on the next poll. // // FIXME not true if we're running on a background thread, assuming we can? var sql = new Sql().Select("*") .From(_appContext.DatabaseContext.SqlSyntax) .Where(dto => dto.Id > _lastId) .OrderBy(dto => dto.Id, _appContext.DatabaseContext.SqlSyntax); var dtos = _appContext.DatabaseContext.Database.Fetch(sql); if (dtos.Count <= 0) return; // only process instructions coming from a remote server, and ignore instructions coming from // the local server as they've already been processed. We should NOT assume that the sequence of // instructions in the database makes any sense whatsoever, because it's all async. var localIdentity = LocalIdentity; var lastId = 0; foreach (var dto in dtos) { if (dto.OriginIdentity == localIdentity) { // just skip that local one but update lastId nevertheless lastId = dto.Id; continue; } // deserialize remote instructions & skip if it fails JArray jsonA; try { jsonA = JsonConvert.DeserializeObject(dto.Instructions); } catch (JsonException ex) { _logger.Error(string.Format("Failed to deserialize instructions ({0}: \"{1}\").", dto.Id, dto.Instructions), ex); lastId = dto.Id; // skip continue; } // execute remote instructions & update lastId try { NotifyRefreshers(jsonA); lastId = dto.Id; } catch (Exception ex) { _logger.Error( string.Format("DISTRIBUTED CACHE IS NOT UPDATED. Failed to execute instructions ({0}: \"{1}\"). Instruction is being skipped/ignored", dto.Id, dto.Instructions), ex); //we cannot throw here because this invalid instruction will just keep getting processed over and over and errors // will be thrown over and over. The only thing we can do is ignore and move on. lastId = dto.Id; } } if (lastId > 0) SaveLastSynced(lastId); } /// /// Remove old instructions from the database /// /// /// Always leave the last (most recent) record in the db table, this is so that not all instructions are removed which would cause /// the site to cold boot if there's been no instruction activity for more than DaysToRetainInstructions. /// See: http://issues.umbraco.org/issue/U4-7643#comment=67-25085 /// private void PruneOldInstructions() { var pruneDate = DateTime.UtcNow.AddDays(-_options.DaysToRetainInstructions); var sqlSyntax = _appContext.DatabaseContext.SqlSyntax; //NOTE: this query could work on SQL server and MySQL: /* SELECT id FROM umbracoCacheInstruction WHERE utcStamp < getdate() AND id <> (SELECT MAX(id) FROM umbracoCacheInstruction) */ // However, this will not work on SQLCE and in fact it will be slower than the query we are // using if the SQL server doesn't perform it's own query optimizations (i.e. since the above // query could actually execute a sub query for every row found). So we've had to go with an // inner join which is faster and works on SQLCE but it's uglier to read. var deleteQuery = new Sql().Select("cacheIns.id") .From("umbracoCacheInstruction cacheIns") .InnerJoin("(SELECT MAX(id) id FROM umbracoCacheInstruction) tMax") .On("cacheIns.id <> tMax.id") .Where("cacheIns.utcStamp < @pruneDate", new {pruneDate = pruneDate}); var deleteSql = sqlSyntax.GetDeleteSubquery( "umbracoCacheInstruction", "id", deleteQuery); _appContext.DatabaseContext.Database.Execute(deleteSql); } /// /// Ensure that the last instruction that was processed is still in the database. /// /// If the last instruction is not in the database anymore, then the messenger /// should not try to process any instructions, because some instructions might be lost, /// and it should instead cold-boot. private void EnsureInstructions() { var sql = new Sql().Select("*") .From(_appContext.DatabaseContext.SqlSyntax) .Where(dto => dto.Id == _lastId); var dtos = _appContext.DatabaseContext.Database.Fetch(sql); if (dtos.Count == 0) _lastId = -1; } /// /// Reads the last-synced id from file into memory. /// /// /// Thread safety: this is NOT thread safe. Because it is NOT meant to run multi-threaded. /// private void ReadLastSynced() { var path = SyncFilePath; if (File.Exists(path) == false) return; var content = File.ReadAllText(path); int last; if (int.TryParse(content, out last)) _lastId = last; } /// /// Updates the in-memory last-synced id and persists it to file. /// /// The id. /// /// Thread safety: this is NOT thread safe. Because it is NOT meant to run multi-threaded. /// private void SaveLastSynced(int id) { File.WriteAllText(SyncFilePath, id.ToString(CultureInfo.InvariantCulture)); _lastId = id; } /// /// Gets the unique local identity of the executing AppDomain. /// /// /// It is not only about the "server" (machine name and appDomainappId), but also about /// an AppDomain, within a Process, on that server - because two AppDomains running at the same /// time on the same server (eg during a restart) are, practically, a LB setup. /// Practically, all we really need is the guid, the other infos are here for information /// and debugging purposes. /// protected readonly static string LocalIdentity = NetworkHelper.MachineName // eg DOMAIN\SERVER + "/" + HttpRuntime.AppDomainAppId // eg /LM/S3SVC/11/ROOT + " [P" + Process.GetCurrentProcess().Id // eg 1234 + "/D" + AppDomain.CurrentDomain.Id // eg 22 + "] " + Guid.NewGuid().ToString("N").ToUpper(); // make it truly unique /// /// Gets the sync file path for the local server. /// /// The sync file path for the local server. private static string SyncFilePath { get { var tempFolder = IOHelper.MapPath("~/App_Data/TEMP/DistCache/" + NetworkHelper.FileSafeMachineName); if (Directory.Exists(tempFolder) == false) Directory.CreateDirectory(tempFolder); return Path.Combine(tempFolder, HttpRuntime.AppDomainAppId.ReplaceNonAlphanumericChars(string.Empty) + "-lastsynced.txt"); } } #endregion #region Notify refreshers private static ICacheRefresher GetRefresher(Guid id) { var refresher = CacheRefreshersResolver.Current.GetById(id); if (refresher == null) throw new InvalidOperationException("Cache refresher with ID \"" + id + "\" does not exist."); return refresher; } private static IJsonCacheRefresher GetJsonRefresher(Guid id) { return GetJsonRefresher(GetRefresher(id)); } private static IJsonCacheRefresher GetJsonRefresher(ICacheRefresher refresher) { var jsonRefresher = refresher as IJsonCacheRefresher; if (jsonRefresher == null) throw new InvalidOperationException("Cache refresher with ID \"" + refresher.UniqueIdentifier + "\" does not implement " + typeof(IJsonCacheRefresher) + "."); return jsonRefresher; } private static void NotifyRefreshers(IEnumerable jsonArray) { foreach (var jsonItem in jsonArray) { // could be a JObject in which case we can convert to a RefreshInstruction, // otherwise it could be another JArray - in which case we'll iterate that. var jsonObj = jsonItem as JObject; if (jsonObj != null) { var instruction = jsonObj.ToObject(); switch (instruction.RefreshType) { case RefreshMethodType.RefreshAll: RefreshAll(instruction.RefresherId); break; case RefreshMethodType.RefreshByGuid: RefreshByGuid(instruction.RefresherId, instruction.GuidId); break; case RefreshMethodType.RefreshById: RefreshById(instruction.RefresherId, instruction.IntId); break; case RefreshMethodType.RefreshByIds: RefreshByIds(instruction.RefresherId, instruction.JsonIds); break; case RefreshMethodType.RefreshByJson: RefreshByJson(instruction.RefresherId, instruction.JsonPayload); break; case RefreshMethodType.RemoveById: RemoveById(instruction.RefresherId, instruction.IntId); break; } } else { var jsonInnerArray = (JArray) jsonItem; NotifyRefreshers(jsonInnerArray); // recurse } } } private static void RefreshAll(Guid uniqueIdentifier) { var refresher = GetRefresher(uniqueIdentifier); refresher.RefreshAll(); } private static void RefreshByGuid(Guid uniqueIdentifier, Guid id) { var refresher = GetRefresher(uniqueIdentifier); refresher.Refresh(id); } private static void RefreshById(Guid uniqueIdentifier, int id) { var refresher = GetRefresher(uniqueIdentifier); refresher.Refresh(id); } private static void RefreshByIds(Guid uniqueIdentifier, string jsonIds) { var refresher = GetRefresher(uniqueIdentifier); foreach (var id in JsonConvert.DeserializeObject(jsonIds)) refresher.Refresh(id); } private static void RefreshByJson(Guid uniqueIdentifier, string jsonPayload) { var refresher = GetJsonRefresher(uniqueIdentifier); refresher.Refresh(jsonPayload); } private static void RemoveById(Guid uniqueIdentifier, int id) { var refresher = GetRefresher(uniqueIdentifier); refresher.Remove(id); } #endregion } }