using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Data.SqlClient; using System.Data.SqlServerCe; using System.Linq; using System.Text.RegularExpressions; using NPoco; using StackExchange.Profiling.Data; using Umbraco.Core.Persistence.FaultHandling; using Umbraco.Core.Persistence.SqlSyntax; namespace Umbraco.Core.Persistence { /// /// Provides extension methods to NPoco Database class. /// public static class NPocoDatabaseExtensions { // NOTE // // proper way to do it with TSQL and SQLCE // IF EXISTS (SELECT ... FROM table WITH (UPDLOCK,HOLDLOCK)) WHERE ...) // BEGIN // UPDATE table SET ... WHERE ... // END // ELSE // BEGIN // INSERT INTO table (...) VALUES (...) // END // // works in READ COMMITED, TSQL & SQLCE lock the constraint even if it does not exist, so INSERT is OK // // proper way to do it with MySQL // IF EXISTS (SELECT ... FROM table WHERE ... FOR UPDATE) // BEGIN // UPDATE table SET ... WHERE ... // END // ELSE // BEGIN // INSERT INTO table (...) VALUES (...) // END // // MySQL locks the constraint ONLY if it exists, so INSERT may fail... // in theory, happens in READ COMMITTED but not REPEATABLE READ // http://www.percona.com/blog/2012/08/28/differences-between-read-committed-and-repeatable-read-transaction-isolation-levels/ // but according to // http://dev.mysql.com/doc/refman/5.0/en/set-transaction.html // it won't work for exact index value (only ranges) so really... // // MySQL should do // INSERT INTO table (...) VALUES (...) ON DUPLICATE KEY UPDATE ... // // also the lock is released when the transaction is committed // not sure if that can have unexpected consequences on our code? // // so... for the time being, let's do with that somewhat crazy solution below... // todo: use the proper database syntax, not this kludge /// /// Safely inserts a record, or updates if it exists, based on a unique constraint. /// /// /// /// The action that executed, either an insert or an update. If an insert occurred and a PK value got generated, the poco object /// passed in will contain the updated value. /// /// We cannot rely on database-specific options such as MySql ON DUPLICATE KEY UPDATE or MSSQL MERGE WHEN MATCHED because SQLCE /// does not support any of them. Ideally this should be achieved with proper transaction isolation levels but that would mean revisiting /// isolation levels globally. We want to keep it simple for the time being and manage it manually. /// We handle it by trying to update, then insert, etc. until something works, or we get bored. /// Note that with proper transactions, if T2 begins after T1 then we are sure that the database will contain T2's value /// once T1 and T2 have completed. Whereas here, it could contain T1's value. /// internal static RecordPersistenceType InsertOrUpdate(this IDatabase db, T poco) where T : class { return db.InsertOrUpdate(poco, null, null); } /// /// Safely inserts a record, or updates if it exists, based on a unique constraint. /// /// /// /// /// If the entity has a composite key they you need to specify the update command explicitly /// The action that executed, either an insert or an update. If an insert occurred and a PK value got generated, the poco object /// passed in will contain the updated value. /// /// We cannot rely on database-specific options such as MySql ON DUPLICATE KEY UPDATE or MSSQL MERGE WHEN MATCHED because SQLCE /// does not support any of them. Ideally this should be achieved with proper transaction isolation levels but that would mean revisiting /// isolation levels globally. We want to keep it simple for the time being and manage it manually. /// We handle it by trying to update, then insert, etc. until something works, or we get bored. /// Note that with proper transactions, if T2 begins after T1 then we are sure that the database will contain T2's value /// once T1 and T2 have completed. Whereas here, it could contain T1's value. /// internal static RecordPersistenceType InsertOrUpdate(this IDatabase db, T poco, string updateCommand, object updateArgs) where T : class { if (poco == null) throw new ArgumentNullException(nameof(poco)); // try to update var rowCount = updateCommand.IsNullOrWhiteSpace() ? db.Update(poco) : db.Update(updateCommand, updateArgs); if (rowCount > 0) return RecordPersistenceType.Update; // failed: does not exist, need to insert // RC1 race cond here: another thread may insert a record with the same constraint var i = 0; while (i++ < 4) { try { // try to insert db.Insert(poco); return RecordPersistenceType.Insert; } catch (SqlException) // assuming all db engines will throw that exception { // failed: exists (due to race cond RC1) // RC2 race cond here: another thread may remove the record // try to update rowCount = updateCommand.IsNullOrWhiteSpace() ? db.Update(poco) : db.Update(updateCommand, updateArgs); if (rowCount > 0) return RecordPersistenceType.Update; // failed: does not exist (due to race cond RC2), need to insert // loop } } // this can go on forever... have to break at some point and report an error. throw new DataException("Record could not be inserted or updated."); } /// /// This will escape single @ symbols for npoco values so it doesn't think it's a parameter /// /// /// public static string EscapeAtSymbols(string value) { if (value.Contains("@") == false) return value; //this fancy regex will only match a single @ not a double, etc... var regex = new Regex("(? /// Bulk-inserts records within a transaction. /// /// The type of the records. /// The database. /// The records. /// Whether to use native bulk insert when available. public static void BulkInsertRecordsWithTransaction(this Database database, IEnumerable records, bool useNativeBulkInsert = true) { var recordsA = records.ToArray(); if (recordsA.Length == 0) return; // no need to "try...catch", if the transaction is not completed it will rollback! using (var tr = database.GetTransaction()) { database.BulkInsertRecords(recordsA, useNativeBulkInsert); tr.Complete(); } } /// /// Bulk-inserts records. /// /// The type of the records. /// The database. /// The records. /// Whether to use native bulk insert when available. /// The number of records that were inserted. public static int BulkInsertRecords(this Database database, IEnumerable records, bool useNativeBulkInsert = true) { var recordsA = records.ToArray(); if (recordsA.Length == 0) return 0; var pocoData = database.PocoDataFactory.ForType(typeof(T)); if (pocoData == null) throw new InvalidOperationException("Could not find PocoData for " + typeof(T)); if (database.DatabaseType.IsSqlCe()) { if (useNativeBulkInsert) return BulkInsertRecordsSqlCe(database, pocoData, recordsA); // else, no other choice foreach (var record in recordsA) database.Insert(record); return recordsA.Length; } if (database.DatabaseType.IsSqlServer()) { return useNativeBulkInsert && database.DatabaseType.IsSqlServer2008OrLater() ? BulkInsertRecordsSqlServer(database, pocoData, recordsA) : BulkInsertRecordsWithCommands(database, recordsA); } if (database.DatabaseType.IsMySql()) return BulkInsertRecordsWithCommands(database, recordsA); throw new NotSupportedException(); } /// /// Bulk-insert records using commands. /// /// The type of the records. /// The database. /// The records. /// The number of records that were inserted. private static int BulkInsertRecordsWithCommands(Database database, T[] records) { foreach (var command in database.GenerateBulkInsertCommands(records)) command.ExecuteNonQuery(); return records.Length; // what else? } /// /// Creates bulk-insert commands. /// /// The type of the records. /// The database. /// The records. /// The sql commands to execute. internal static IDbCommand[] GenerateBulkInsertCommands(this Database database, T[] records) { var pocoData = database.PocoDataFactory.ForType(typeof(T)); // get columns to include, = number of parameters per row var columns = pocoData.Columns.Where(c => IncludeColumn(pocoData, c)).ToArray(); var paramsPerRecord = columns.Length; // format columns to sql var tableName = database.DatabaseType.EscapeTableName(pocoData.TableInfo.TableName); var columnNames = string.Join(", ", columns.Select(c => tableName + "." + database.DatabaseType.EscapeSqlIdentifier(c.Key))); // example: // assume 4168 records, each record containing 8 fields, ie 8 command parameters // max 2100 parameter per command // Math.Floor(2100 / 8) = 262 record per command // 4168 / 262 = 15.908... = there will be 16 command in total // (if we have disabled db parameters, then all records will be included, in only one command) var recordsPerCommand = paramsPerRecord == 0 ? int.MaxValue : Convert.ToInt32(Math.Floor(2000.00 / paramsPerRecord)); var commandsCount = Convert.ToInt32(Math.Ceiling((double)records.Length / recordsPerCommand)); var commands = new IDbCommand[commandsCount]; var recordsIndex = 0; var recordsLeftToInsert = records.Length; var prefix = database.DatabaseType.GetParameterPrefix(database.ConnectionString); for (var commandIndex = 0; commandIndex < commandsCount; commandIndex++) { var command = database.CreateCommand(database.Connection, CommandType.Text, string.Empty); var parameterIndex = 0; var commandRecords = Math.Min(recordsPerCommand, recordsLeftToInsert); var recordsValues = new string[commandRecords]; for (var commandRecordIndex = 0; commandRecordIndex < commandRecords; commandRecordIndex++, recordsIndex++, recordsLeftToInsert--) { var record = records[recordsIndex]; var recordValues = new string[columns.Length]; for (var columnIndex = 0; columnIndex < columns.Length; columnIndex++) { database.AddParameter(command, columns[columnIndex].Value.GetValue(record)); recordValues[columnIndex] = prefix + parameterIndex++; } recordsValues[commandRecordIndex] = "(" + string.Join(",", recordValues) + ")"; } command.CommandText = $"INSERT INTO {tableName} ({columnNames}) VALUES {string.Join(", ", recordsValues)}"; commands[commandIndex] = command; } return commands; } /// /// Determines whether a column should be part of a bulk-insert. /// /// The PocoData object corresponding to the record's type. /// The column. /// A value indicating whether the column should be part of the bulk-insert. /// Columns that are primary keys and auto-incremental, or result columns, are excluded from bulk-inserts. private static bool IncludeColumn(PocoData pocoData, KeyValuePair column) { return column.Value.ResultColumn == false && (pocoData.TableInfo.AutoIncrement == false || column.Key != pocoData.TableInfo.PrimaryKey); } /// /// Bulk-insert records using SqlCE TableDirect method. /// /// The type of the records. /// The database. /// The PocoData object corresponding to the record's type. /// The records. /// The number of records that were inserted. internal static int BulkInsertRecordsSqlCe(Database database, PocoData pocoData, IEnumerable records) { var columns = pocoData.Columns.ToArray(); // create command against the original database.Connection using (var command = database.CreateCommand(database.Connection, CommandType.TableDirect, string.Empty)) { command.CommandText = pocoData.TableInfo.TableName; command.CommandType = CommandType.TableDirect; // fixme - why repeat? // fixme - not supporting transactions? //cmd.Transaction = GetTypedTransaction(db.Connection.); var count = 0; var tCommand = GetTypedCommand(command); // execute on the real command // seems to cause problems, I think this is primarily used for retrieval, not inserting. // see: https://msdn.microsoft.com/en-us/library/system.data.sqlserverce.sqlcecommand.indexname%28v=vs.100%29.aspx?f=255&MSPPError=-2147217396 //tCommand.IndexName = pd.TableInfo.PrimaryKey; using (var resultSet = tCommand.ExecuteResultSet(ResultSetOptions.Updatable)) { var updatableRecord = resultSet.CreateRecord(); foreach (var record in records) { for (var i = 0; i < columns.Length; i++) { // skip the index if this shouldn't be included (i.e. PK) if (IncludeColumn(pocoData, columns[i])) { var val = columns[i].Value.GetValue(record); updatableRecord.SetValue(i, val); } } resultSet.Insert(updatableRecord); count++; } } return count; } } /// /// Bulk-insert records using SqlServer BulkCopy method. /// /// The type of the records. /// The database. /// The PocoData object corresponding to the record's type. /// The records. /// The number of records that were inserted. internal static int BulkInsertRecordsSqlServer(Database database, PocoData pocoData, IEnumerable records) { // create command against the original database.Connection using (var command = database.CreateCommand(database.Connection, CommandType.Text, string.Empty)) { // use typed connection and transactionf or SqlBulkCopy var tConnection = GetTypedConnection(database.Connection); var tTransaction = GetTypedTransaction(command.Transaction); var tableName = pocoData.TableInfo.TableName; var umbracoDatabase = database as UmbracoDatabase; if (umbracoDatabase == null) throw new NotSupportedException("Database must be UmbracoDatabase."); var syntax = umbracoDatabase.SqlSyntax as SqlServerSyntaxProvider; if (syntax == null) throw new NotSupportedException("SqlSyntax must be SqlServerSyntaxProvider."); using (var copy = new SqlBulkCopy(tConnection, SqlBulkCopyOptions.Default, tTransaction) { BulkCopyTimeout = 10000, DestinationTableName = tableName }) using (var bulkReader = new PocoDataDataReader(records, pocoData, syntax)) { copy.WriteToServer(bulkReader); return bulkReader.RecordsAffected; } } } /// /// Returns the underlying connection as a typed connection - this is used to unwrap the profiled mini profiler stuff /// /// /// /// private static TConnection GetTypedConnection(IDbConnection connection) where TConnection : class, IDbConnection { var profiled = connection as ProfiledDbConnection; return profiled == null ? connection as TConnection : profiled.InnerConnection as TConnection; } /// /// Returns the underlying transaction as a typed transaction - this is used to unwrap the profiled mini profiler stuff /// /// /// /// private static TTransaction GetTypedTransaction(IDbTransaction transaction) where TTransaction : class, IDbTransaction { var profiled = transaction as ProfiledDbTransaction; return profiled == null ? transaction as TTransaction : profiled.WrappedTransaction as TTransaction; } /// /// Returns the underlying command as a typed command - this is used to unwrap the profiled mini profiler stuff /// /// /// /// private static TCommand GetTypedCommand(IDbCommand command) where TCommand : class, IDbCommand { var faultHandling = command as FaultHandlingDbCommand; if (faultHandling != null) command = faultHandling.Inner; var profiled = command as ProfiledDbCommand; if (profiled != null) command = profiled.InternalCommand; return command as TCommand; } public static void TruncateTable(this IDatabase db, ISqlSyntaxProvider sqlSyntax, string tableName) { var sql = new Sql(string.Format( sqlSyntax.TruncateTable, sqlSyntax.GetQuotedTableName(tableName))); db.Execute(sql); } public static IsolationLevel GetCurrentTransactionIsolationLevel(this IDatabase database) { var transaction = database.Transaction; return transaction?.IsolationLevel ?? IsolationLevel.Unspecified; } public static IEnumerable FetchByGroups(this IDatabase db, IEnumerable source, int groupSize, Func, Sql> sqlFactory) { return source.SelectByGroups(x => db.Fetch(sqlFactory(x)), groupSize); } } }