From ded5848e4c3080d913bd2368d27d6823a752acaa Mon Sep 17 00:00:00 2001 From: Stephan Date: Wed, 8 Jun 2016 09:59:41 +0200 Subject: [PATCH] Cleanup - NPoco bulk inserts --- .../Persistence/NPocoDatabaseExtensions.cs | 128 ++++++++---------- .../Persistence/NPocoExtensionsTest.cs | 4 +- 2 files changed, 57 insertions(+), 75 deletions(-) diff --git a/src/Umbraco.Core/Persistence/NPocoDatabaseExtensions.cs b/src/Umbraco.Core/Persistence/NPocoDatabaseExtensions.cs index fb7201657f..9329fa2287 100644 --- a/src/Umbraco.Core/Persistence/NPocoDatabaseExtensions.cs +++ b/src/Umbraco.Core/Persistence/NPocoDatabaseExtensions.cs @@ -53,6 +53,7 @@ namespace Umbraco.Core.Persistence // not sure if that can have unexpected consequences on our code? // // so... for the time being, let's do with that somewhat crazy solution below... + // todo: use the proper database syntax, not this kludge /// /// Safely inserts a record, or updates if it exists, based on a unique constraint. @@ -99,7 +100,7 @@ namespace Umbraco.Core.Persistence where T : class { if (poco == null) - throw new ArgumentNullException("poco"); + throw new ArgumentNullException(nameof(poco)); // try to update var rowCount = updateCommand.IsNullOrWhiteSpace() @@ -120,14 +121,11 @@ namespace Umbraco.Core.Persistence db.Insert(poco); return RecordPersistenceType.Insert; } - catch (SqlException) // TODO: need to find out if all db will throw that exception - probably OK + catch (SqlException) // assuming all db engines will throw that exception { // failed: exists (due to race cond RC1) // RC2 race cond here: another thread may remove the record - // fixme - debugging, ok? - throw; - // try to update rowCount = updateCommand.IsNullOrWhiteSpace() ? db.Update(poco) @@ -151,28 +149,25 @@ namespace Umbraco.Core.Persistence /// public static string EscapeAtSymbols(string value) { - if (value.Contains("@")) - { - //this fancy regex will only match a single @ not a double, etc... - var regex = new Regex("(?(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable collection) + public static void BulkInsertRecordsWithTransaction(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable records) { - //don't do anything if there are no records. - if (collection.Any() == false) + var recordsA = records.ToArray(); + if (recordsA.Length == 0) return; // no need to "try...catch", if the transaction is not completed it will rollback! using (var tr = db.GetTransaction()) { - db.BulkInsertRecords(sqlSyntax, collection); + db.BulkInsertRecords(sqlSyntax, recordsA); tr.Complete(); } } @@ -184,29 +179,26 @@ namespace Umbraco.Core.Persistence /// /// /// - /// - public static void BulkInsertRecords(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable collection) + /// + public static void BulkInsertRecords(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable records) { - //don't do anything if there are no records. - if (collection.Any() == false) + var recordsA = records.ToArray(); + if (recordsA.Length == 0) return; - //if it is sql ce or it is a sql server version less than 2008, we need to do individual inserts. + // if it is sql ce or it is a sql server version less than 2008, we need to do individual inserts. var sqlServerSyntax = sqlSyntax as SqlServerSyntaxProvider; if ((sqlServerSyntax != null && (int) sqlServerSyntax.ServerVersion.ProductVersionName < (int) SqlServerSyntaxProvider.VersionName.V2008) || sqlSyntax is SqlCeSyntaxProvider) { - //SqlCe doesn't support bulk insert statements! - - foreach (var poco in collection) - { + // SqlCe doesn't support bulk insert statements! + foreach (var poco in recordsA) db.Insert(poco); - } } else { string[] sqlStatements; - var cmds = db.GenerateBulkInsertCommand(collection, db.Connection, out sqlStatements); + var cmds = db.GenerateBulkInsertCommand(recordsA, db.Connection, out sqlStatements); for (var i = 0; i < sqlStatements.Length; i++) { using (var cmd = cmds[i]) @@ -218,12 +210,20 @@ namespace Umbraco.Core.Persistence } } + private static bool IncludeColumn(PocoData pocoData, string columnKey, PocoColumn column) + { + return + column.ResultColumn == false + && pocoData.TableInfo.AutoIncrement == false + && columnKey != pocoData.TableInfo.PrimaryKey; + } + /// /// Creates a bulk insert command /// /// /// - /// + /// /// /// /// Sql commands with populated command parameters required to execute the sql statement @@ -235,76 +235,59 @@ namespace Umbraco.Core.Persistence /// internal static IDbCommand[] GenerateBulkInsertCommand( this IDatabase db, - IEnumerable collection, + T[] records, DbConnection connection, out string[] sql) { - //A filter used below a few times to get all columns except result cols and not the primary key if it is auto-incremental - Func, bool> includeColumn = (data, column) => - { - if (column.Value.ResultColumn) return false; - if (data.TableInfo.AutoIncrement && column.Key == data.TableInfo.PrimaryKey) return false; - return true; - }; + var pocoData = db.PocoDataFactory.ForType(typeof(T)); - var pd = db.PocoDataFactory.ForType(typeof(T)); - var tableName = db.DatabaseType.EscapeTableName(pd.TableInfo.TableName); + // get columns to include, = number of parameters per row + var columns = pocoData.Columns.Where(c => IncludeColumn(pocoData, c.Key, c.Value)).ToArray(); + var paramsPerRow = columns.Length; - //get all columns to include and format for sql - var cols = string.Join(", ", - pd.Columns - .Where(c => includeColumn(pd, c)) - .Select(c => tableName + "." + db.DatabaseType.EscapeSqlIdentifier(c.Key)).ToArray()); + // format columns to sql + var tableName = db.DatabaseType.EscapeTableName(pocoData.TableInfo.TableName); + var columnNames = string.Join(", ", columns.Select(c => tableName + "." + db.DatabaseType.EscapeSqlIdentifier(c.Key))); - var itemArray = collection.ToArray(); - - //calculate number of parameters per item - var paramsPerItem = pd.Columns.Count(i => includeColumn(pd, i)); - - //Example calc: - // Given: we have 4168 items in the itemArray, each item contains 8 command parameters (values to be inserterted) + // example calc: + // given: we have 4168 items in the collection, each item contains 8 command parameters (values to be inserted) // 2100 / 8 = 262.5 // Math.Floor(2100 / 8) = 262 items per trans // 4168 / 262 = 15.908... = there will be 16 trans in total - //all items will be included if we have disabled db parameters - var itemsPerTrans = Math.Floor(2000.00 / paramsPerItem); - //there will only be one transaction if we have disabled db parameters - var numTrans = Math.Ceiling(itemArray.Length / itemsPerTrans); + // if we have disabled db parameters, then all items will be included, in only one transaction + var rowsPerCommand = Convert.ToInt32(Math.Floor(2000.00 / paramsPerRow)); + var commandsCount = Convert.ToInt32(Math.Ceiling((double) records.Length / rowsPerCommand)); - var sqlQueries = new List(); - var commands = new List(); + sql = new string[commandsCount]; + var commands = new IDbCommand[commandsCount]; - for (var tIndex = 0; tIndex < numTrans; tIndex++) + for (var commandIndex = 0; commandIndex < commandsCount; commandIndex++) { - var itemsForTrans = itemArray - .Skip(tIndex * (int)itemsPerTrans) - .Take((int)itemsPerTrans); + var itemsForTrans = records + .Skip(commandIndex * rowsPerCommand) + .Take(rowsPerCommand); var cmd = db.CreateCommand(connection, ""); + var prefix = db.DatabaseType.GetParameterPrefix(cmd.Connection.ConnectionString); var pocoValues = new List(); var index = 0; foreach (var poco in itemsForTrans) { var values = new List(); - //get all columns except result cols and not the primary key if it is auto-incremental - var prefix = db.DatabaseType.GetParameterPrefix(cmd.Connection.ConnectionString); - foreach (var i in pd.Columns.Where(x => includeColumn(pd, x))) + foreach (var column in columns) { - db.AddParameter(cmd, i.Value.GetValue(poco)); + db.AddParameter(cmd, column.Value.GetValue(poco)); values.Add(prefix + index++); } pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")"); } - var sqlResult = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues)); - sqlQueries.Add(sqlResult); - commands.Add(cmd); + sql[commandIndex] = $"INSERT INTO {tableName} ({columnNames}) VALUES {string.Join(", ", pocoValues)}"; + commands[commandIndex] = cmd; } - sql = sqlQueries.ToArray(); - - return commands.ToArray(); + return commands; } public static void TruncateTable(this IDatabase db, ISqlSyntaxProvider sqlSyntax, string tableName) @@ -318,13 +301,12 @@ namespace Umbraco.Core.Persistence public static IsolationLevel GetCurrentTransactionIsolationLevel(this IDatabase database) { var transaction = database.Transaction; - return transaction == null ? IsolationLevel.Unspecified : transaction.IsolationLevel; + return transaction?.IsolationLevel ?? IsolationLevel.Unspecified; } public static IEnumerable FetchByGroups(this IDatabase db, IEnumerable source, int groupSize, Func, Sql> sqlFactory) { return source.SelectByGroups(x => db.Fetch(sqlFactory(x)), groupSize); } - } } \ No newline at end of file diff --git a/src/Umbraco.Tests/Persistence/NPocoExtensionsTest.cs b/src/Umbraco.Tests/Persistence/NPocoExtensionsTest.cs index d8f781ac77..45a98ed3c7 100644 --- a/src/Umbraco.Tests/Persistence/NPocoExtensionsTest.cs +++ b/src/Umbraco.Tests/Persistence/NPocoExtensionsTest.cs @@ -77,7 +77,7 @@ namespace Umbraco.Tests.Persistence // Act string[] sql; - db.GenerateBulkInsertCommand(servers, db.Connection, out sql); + db.GenerateBulkInsertCommand(servers.ToArray(), db.Connection, out sql); db.CloseSharedConnection(); // Assert @@ -109,7 +109,7 @@ namespace Umbraco.Tests.Persistence // Act string[] sql; - db.GenerateBulkInsertCommand(servers, db.Connection, out sql); + db.GenerateBulkInsertCommand(servers.ToArray(), db.Connection, out sql); db.CloseSharedConnection(); // Assert