Cleanup - NPoco bulk inserts

This commit is contained in:
Stephan
2016-06-08 09:59:41 +02:00
parent d9fe474269
commit ded5848e4c
2 changed files with 57 additions and 75 deletions

View File

@@ -53,6 +53,7 @@ namespace Umbraco.Core.Persistence
// not sure if that can have unexpected consequences on our code?
//
// so... for the time being, let's do with that somewhat crazy solution below...
// todo: use the proper database syntax, not this kludge
/// <summary>
/// Safely inserts a record, or updates if it exists, based on a unique constraint.
@@ -99,7 +100,7 @@ namespace Umbraco.Core.Persistence
where T : class
{
if (poco == null)
throw new ArgumentNullException("poco");
throw new ArgumentNullException(nameof(poco));
// try to update
var rowCount = updateCommand.IsNullOrWhiteSpace()
@@ -120,14 +121,11 @@ namespace Umbraco.Core.Persistence
db.Insert(poco);
return RecordPersistenceType.Insert;
}
catch (SqlException) // TODO: need to find out if all db will throw that exception - probably OK
catch (SqlException) // assuming all db engines will throw that exception
{
// failed: exists (due to race cond RC1)
// RC2 race cond here: another thread may remove the record
// fixme - debugging, ok?
throw;
// try to update
rowCount = updateCommand.IsNullOrWhiteSpace()
? db.Update(poco)
@@ -151,28 +149,25 @@ namespace Umbraco.Core.Persistence
/// <returns></returns>
public static string EscapeAtSymbols(string value)
{
if (value.Contains("@"))
{
//this fancy regex will only match a single @ not a double, etc...
var regex = new Regex("(?<!@)@(?!@)");
return regex.Replace(value, "@@");
}
return value;
if (value.Contains("@") == false) return value;
//this fancy regex will only match a single @ not a double, etc...
var regex = new Regex("(?<!@)@(?!@)");
return regex.Replace(value, "@@");
}
// fixme - NPoco has BulkInsert now?
// todo: review NPoco native InsertBulk to replace the code below
public static void BulkInsertRecordsWithTransaction<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> collection)
public static void BulkInsertRecordsWithTransaction<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> records)
{
//don't do anything if there are no records.
if (collection.Any() == false)
var recordsA = records.ToArray();
if (recordsA.Length == 0)
return;
// no need to "try...catch", if the transaction is not completed it will rollback!
using (var tr = db.GetTransaction())
{
db.BulkInsertRecords(sqlSyntax, collection);
db.BulkInsertRecords(sqlSyntax, recordsA);
tr.Complete();
}
}
@@ -184,29 +179,26 @@ namespace Umbraco.Core.Persistence
/// <typeparam name="T"></typeparam>
/// <param name="db"></param>
/// <param name="sqlSyntax"></param>
/// <param name="collection"></param>
public static void BulkInsertRecords<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> collection)
/// <param name="records"></param>
public static void BulkInsertRecords<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> records)
{
//don't do anything if there are no records.
if (collection.Any() == false)
var recordsA = records.ToArray();
if (recordsA.Length == 0)
return;
//if it is sql ce or it is a sql server version less than 2008, we need to do individual inserts.
// if it is sql ce or it is a sql server version less than 2008, we need to do individual inserts.
var sqlServerSyntax = sqlSyntax as SqlServerSyntaxProvider;
if ((sqlServerSyntax != null && (int) sqlServerSyntax.ServerVersion.ProductVersionName < (int) SqlServerSyntaxProvider.VersionName.V2008)
|| sqlSyntax is SqlCeSyntaxProvider)
{
//SqlCe doesn't support bulk insert statements!
foreach (var poco in collection)
{
// SqlCe doesn't support bulk insert statements!
foreach (var poco in recordsA)
db.Insert(poco);
}
}
else
{
string[] sqlStatements;
var cmds = db.GenerateBulkInsertCommand(collection, db.Connection, out sqlStatements);
var cmds = db.GenerateBulkInsertCommand(recordsA, db.Connection, out sqlStatements);
for (var i = 0; i < sqlStatements.Length; i++)
{
using (var cmd = cmds[i])
@@ -218,12 +210,20 @@ namespace Umbraco.Core.Persistence
}
}
private static bool IncludeColumn(PocoData pocoData, string columnKey, PocoColumn column)
{
return
column.ResultColumn == false
&& pocoData.TableInfo.AutoIncrement == false
&& columnKey != pocoData.TableInfo.PrimaryKey;
}
/// <summary>
/// Creates a bulk insert command
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="db"></param>
/// <param name="collection"></param>
/// <param name="records"></param>
/// <param name="connection"></param>
/// <param name="sql"></param>
/// <returns>Sql commands with populated command parameters required to execute the sql statement</returns>
@@ -235,76 +235,59 @@ namespace Umbraco.Core.Persistence
/// </remarks>
internal static IDbCommand[] GenerateBulkInsertCommand<T>(
this IDatabase db,
IEnumerable<T> collection,
T[] records,
DbConnection connection,
out string[] sql)
{
//A filter used below a few times to get all columns except result cols and not the primary key if it is auto-incremental
Func<PocoData, KeyValuePair<string, PocoColumn>, bool> includeColumn = (data, column) =>
{
if (column.Value.ResultColumn) return false;
if (data.TableInfo.AutoIncrement && column.Key == data.TableInfo.PrimaryKey) return false;
return true;
};
var pocoData = db.PocoDataFactory.ForType(typeof(T));
var pd = db.PocoDataFactory.ForType(typeof(T));
var tableName = db.DatabaseType.EscapeTableName(pd.TableInfo.TableName);
// get columns to include, = number of parameters per row
var columns = pocoData.Columns.Where(c => IncludeColumn(pocoData, c.Key, c.Value)).ToArray();
var paramsPerRow = columns.Length;
//get all columns to include and format for sql
var cols = string.Join(", ",
pd.Columns
.Where(c => includeColumn(pd, c))
.Select(c => tableName + "." + db.DatabaseType.EscapeSqlIdentifier(c.Key)).ToArray());
// format columns to sql
var tableName = db.DatabaseType.EscapeTableName(pocoData.TableInfo.TableName);
var columnNames = string.Join(", ", columns.Select(c => tableName + "." + db.DatabaseType.EscapeSqlIdentifier(c.Key)));
var itemArray = collection.ToArray();
//calculate number of parameters per item
var paramsPerItem = pd.Columns.Count(i => includeColumn(pd, i));
//Example calc:
// Given: we have 4168 items in the itemArray, each item contains 8 command parameters (values to be inserterted)
// example calc:
// given: we have 4168 items in the collection, each item contains 8 command parameters (values to be inserted)
// 2100 / 8 = 262.5
// Math.Floor(2100 / 8) = 262 items per trans
// 4168 / 262 = 15.908... = there will be 16 trans in total
//all items will be included if we have disabled db parameters
var itemsPerTrans = Math.Floor(2000.00 / paramsPerItem);
//there will only be one transaction if we have disabled db parameters
var numTrans = Math.Ceiling(itemArray.Length / itemsPerTrans);
// if we have disabled db parameters, then all items will be included, in only one transaction
var rowsPerCommand = Convert.ToInt32(Math.Floor(2000.00 / paramsPerRow));
var commandsCount = Convert.ToInt32(Math.Ceiling((double) records.Length / rowsPerCommand));
var sqlQueries = new List<string>();
var commands = new List<IDbCommand>();
sql = new string[commandsCount];
var commands = new IDbCommand[commandsCount];
for (var tIndex = 0; tIndex < numTrans; tIndex++)
for (var commandIndex = 0; commandIndex < commandsCount; commandIndex++)
{
var itemsForTrans = itemArray
.Skip(tIndex * (int)itemsPerTrans)
.Take((int)itemsPerTrans);
var itemsForTrans = records
.Skip(commandIndex * rowsPerCommand)
.Take(rowsPerCommand);
var cmd = db.CreateCommand(connection, "");
var prefix = db.DatabaseType.GetParameterPrefix(cmd.Connection.ConnectionString);
var pocoValues = new List<string>();
var index = 0;
foreach (var poco in itemsForTrans)
{
var values = new List<string>();
//get all columns except result cols and not the primary key if it is auto-incremental
var prefix = db.DatabaseType.GetParameterPrefix(cmd.Connection.ConnectionString);
foreach (var i in pd.Columns.Where(x => includeColumn(pd, x)))
foreach (var column in columns)
{
db.AddParameter(cmd, i.Value.GetValue(poco));
db.AddParameter(cmd, column.Value.GetValue(poco));
values.Add(prefix + index++);
}
pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
}
var sqlResult = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
sqlQueries.Add(sqlResult);
commands.Add(cmd);
sql[commandIndex] = $"INSERT INTO {tableName} ({columnNames}) VALUES {string.Join(", ", pocoValues)}";
commands[commandIndex] = cmd;
}
sql = sqlQueries.ToArray();
return commands.ToArray();
return commands;
}
public static void TruncateTable(this IDatabase db, ISqlSyntaxProvider sqlSyntax, string tableName)
@@ -318,13 +301,12 @@ namespace Umbraco.Core.Persistence
public static IsolationLevel GetCurrentTransactionIsolationLevel(this IDatabase database)
{
var transaction = database.Transaction;
return transaction == null ? IsolationLevel.Unspecified : transaction.IsolationLevel;
return transaction?.IsolationLevel ?? IsolationLevel.Unspecified;
}
public static IEnumerable<TResult> FetchByGroups<TResult, TSource>(this IDatabase db, IEnumerable<TSource> source, int groupSize, Func<IEnumerable<TSource>, Sql<SqlContext>> sqlFactory)
{
return source.SelectByGroups(x => db.Fetch<TResult>(sqlFactory(x)), groupSize);
}
}
}

View File

@@ -77,7 +77,7 @@ namespace Umbraco.Tests.Persistence
// Act
string[] sql;
db.GenerateBulkInsertCommand(servers, db.Connection, out sql);
db.GenerateBulkInsertCommand(servers.ToArray(), db.Connection, out sql);
db.CloseSharedConnection();
// Assert
@@ -109,7 +109,7 @@ namespace Umbraco.Tests.Persistence
// Act
string[] sql;
db.GenerateBulkInsertCommand(servers, db.Connection, out sql);
db.GenerateBulkInsertCommand(servers.ToArray(), db.Connection, out sql);
db.CloseSharedConnection();
// Assert