Merge origin/dev-v7-deploy into dev-v8-zbwip
This commit is contained in:
@@ -3,9 +3,11 @@ using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.Data.Common;
|
||||
using System.Data.SqlClient;
|
||||
using System.Data.SqlServerCe;
|
||||
using System.Linq;
|
||||
using System.Text.RegularExpressions;
|
||||
using NPoco;
|
||||
using StackExchange.Profiling.Data;
|
||||
using Umbraco.Core.Persistence.SqlSyntax;
|
||||
|
||||
namespace Umbraco.Core.Persistence
|
||||
@@ -158,137 +160,270 @@ namespace Umbraco.Core.Persistence
|
||||
|
||||
// todo: review NPoco native InsertBulk to replace the code below
|
||||
|
||||
public static void BulkInsertRecordsWithTransaction<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> records)
|
||||
/// <summary>
|
||||
/// Bulk-inserts records within a transaction.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <param name="useNativeBulkInsert">Whether to use native bulk insert when available.</param>
|
||||
public static void BulkInsertRecordsWithTransaction<T>(this Database database, IEnumerable<T> records, bool useNativeBulkInsert = true)
|
||||
{
|
||||
var recordsA = records.ToArray();
|
||||
if (recordsA.Length == 0)
|
||||
return;
|
||||
|
||||
// no need to "try...catch", if the transaction is not completed it will rollback!
|
||||
using (var tr = db.GetTransaction())
|
||||
using (var tr = database.GetTransaction())
|
||||
{
|
||||
db.BulkInsertRecords(sqlSyntax, recordsA);
|
||||
database.BulkInsertRecords(recordsA, useNativeBulkInsert);
|
||||
tr.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Performs the bulk insertion in the context of a current transaction with an optional parameter to complete the transaction
|
||||
/// when finished
|
||||
/// Bulk-inserts records.
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="db"></param>
|
||||
/// <param name="sqlSyntax"></param>
|
||||
/// <param name="records"></param>
|
||||
public static void BulkInsertRecords<T>(this IDatabase db, ISqlSyntaxProvider sqlSyntax, IEnumerable<T> records)
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <param name="useNativeBulkInsert">Whether to use native bulk insert when available.</param>
|
||||
/// <returns>The number of records that were inserted.</returns>
|
||||
public static int BulkInsertRecords<T>(this Database database, IEnumerable<T> records, bool useNativeBulkInsert = true)
|
||||
{
|
||||
var recordsA = records.ToArray();
|
||||
if (recordsA.Length == 0)
|
||||
return;
|
||||
if (recordsA.Length == 0) return 0;
|
||||
|
||||
// if it is sql ce or it is a sql server version less than 2008, we need to do individual inserts.
|
||||
var sqlServerSyntax = sqlSyntax as SqlServerSyntaxProvider;
|
||||
if ((sqlServerSyntax != null && (int) sqlServerSyntax.ServerVersion.ProductVersionName < (int) SqlServerSyntaxProvider.VersionName.V2008)
|
||||
|| sqlSyntax is SqlCeSyntaxProvider)
|
||||
{
|
||||
// SqlCe doesn't support bulk insert statements!
|
||||
foreach (var poco in recordsA)
|
||||
db.Insert(poco);
|
||||
}
|
||||
else
|
||||
{
|
||||
string[] sqlStatements;
|
||||
var cmds = db.GenerateBulkInsertCommand(recordsA, db.Connection, out sqlStatements);
|
||||
for (var i = 0; i < sqlStatements.Length; i++)
|
||||
{
|
||||
using (var cmd = cmds[i])
|
||||
{
|
||||
cmd.CommandText = sqlStatements[i];
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var pocoData = database.PocoDataFactory.ForType(typeof(T));
|
||||
if (pocoData == null) throw new InvalidOperationException("Could not find PocoData for " + typeof(T));
|
||||
|
||||
private static bool IncludeColumn(PocoData pocoData, string columnKey, PocoColumn column)
|
||||
{
|
||||
// exclude result columns,
|
||||
// exclude primary key column if auto-increment
|
||||
return column.ResultColumn == false && (pocoData.TableInfo.AutoIncrement == false || columnKey != pocoData.TableInfo.PrimaryKey);
|
||||
if (database.DatabaseType.IsSqlCe())
|
||||
{
|
||||
return useNativeBulkInsert
|
||||
? BulkInsertRecordsSqlCe(database, pocoData, recordsA)
|
||||
: BulkInsertRecordsWithCommands(database, recordsA);
|
||||
}
|
||||
|
||||
if (database.DatabaseType.IsSqlServer())
|
||||
{
|
||||
return useNativeBulkInsert && database.DatabaseType.IsSqlServer2008OrLater()
|
||||
? BulkInsertRecordsSqlServer(database, pocoData, recordsA)
|
||||
: BulkInsertRecordsWithCommands(database, recordsA);
|
||||
}
|
||||
|
||||
if (database.DatabaseType.IsMySql())
|
||||
return BulkInsertRecordsWithCommands(database, recordsA);
|
||||
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a bulk insert command
|
||||
/// Bulk-insert records using commands.
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="db"></param>
|
||||
/// <param name="records"></param>
|
||||
/// <param name="connection"></param>
|
||||
/// <param name="sql"></param>
|
||||
/// <returns>Sql commands with populated command parameters required to execute the sql statement</returns>
|
||||
/// <remarks>
|
||||
/// The limits for number of parameters are 2100 (in sql server, I think there's many more allowed in mysql). So
|
||||
/// we need to detect that many params and split somehow.
|
||||
/// For some reason the 2100 limit is not actually allowed even though the exception from sql server mentions 2100 as a max, perhaps it is 2099
|
||||
/// that is max. I've reduced it to 2000 anyways.
|
||||
/// </remarks>
|
||||
internal static IDbCommand[] GenerateBulkInsertCommand<T>(
|
||||
this IDatabase db,
|
||||
T[] records,
|
||||
DbConnection connection,
|
||||
out string[] sql)
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <returns>The number of records that were inserted.</returns>
|
||||
private static int BulkInsertRecordsWithCommands<T>(Database database, T[] records)
|
||||
{
|
||||
var pocoData = db.PocoDataFactory.ForType(typeof(T));
|
||||
foreach (var command in database.GenerateBulkInsertCommands(records))
|
||||
command.ExecuteNonQuery();
|
||||
|
||||
return records.Length; // what else?
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates bulk-insert commands.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <returns>The sql commands to execute.</returns>
|
||||
internal static IDbCommand[] GenerateBulkInsertCommands<T>(this Database database, T[] records)
|
||||
{
|
||||
var pocoData = database.PocoDataFactory.ForType(typeof(T));
|
||||
|
||||
// get columns to include, = number of parameters per row
|
||||
var columns = pocoData.Columns.Where(c => IncludeColumn(pocoData, c.Key, c.Value)).ToArray();
|
||||
var paramsPerRow = columns.Length;
|
||||
var columns = pocoData.Columns.Where(c => IncludeColumn(pocoData, c)).ToArray();
|
||||
var paramsPerRecord = columns.Length;
|
||||
|
||||
// format columns to sql
|
||||
var tableName = db.DatabaseType.EscapeTableName(pocoData.TableInfo.TableName);
|
||||
var columnNames = string.Join(", ", columns.Select(c => tableName + "." + db.DatabaseType.EscapeSqlIdentifier(c.Key)));
|
||||
var tableName = database.DatabaseType.EscapeTableName(pocoData.TableInfo.TableName);
|
||||
var columnNames = string.Join(", ", columns.Select(c => tableName + "." + database.DatabaseType.EscapeSqlIdentifier(c.Key)));
|
||||
|
||||
// example calc:
|
||||
// given: we have 4168 items in the collection, each item contains 8 command parameters (values to be inserted)
|
||||
// 2100 / 8 = 262.5
|
||||
// Math.Floor(2100 / 8) = 262 items per trans
|
||||
// 4168 / 262 = 15.908... = there will be 16 trans in total
|
||||
// example:
|
||||
// assume 4168 records, each record containing 8 fields, ie 8 command parameters
|
||||
// max 2100 parameter per command
|
||||
// Math.Floor(2100 / 8) = 262 record per command
|
||||
// 4168 / 262 = 15.908... = there will be 16 command in total
|
||||
// (if we have disabled db parameters, then all records will be included, in only one command)
|
||||
var recordsPerCommand = paramsPerRecord == 0 ? int.MaxValue : Convert.ToInt32(Math.Floor(2000.00 / paramsPerRecord));
|
||||
var commandsCount = Convert.ToInt32(Math.Ceiling((double)records.Length / recordsPerCommand));
|
||||
|
||||
// if we have disabled db parameters, then all items will be included, in only one transaction
|
||||
var rowsPerCommand = paramsPerRow == 0 ? int.MaxValue : Convert.ToInt32(Math.Floor(2000.00 / paramsPerRow));
|
||||
var commandsCount = Convert.ToInt32(Math.Ceiling((double) records.Length / rowsPerCommand));
|
||||
|
||||
sql = new string[commandsCount];
|
||||
var commands = new IDbCommand[commandsCount];
|
||||
|
||||
var recordsIndex = 0;
|
||||
var recordsLeftToInsert = records.Length;
|
||||
var prefix = database.DatabaseType.GetParameterPrefix(database.ConnectionString);
|
||||
for (var commandIndex = 0; commandIndex < commandsCount; commandIndex++)
|
||||
{
|
||||
var itemsForTrans = records
|
||||
.Skip(commandIndex * rowsPerCommand)
|
||||
.Take(rowsPerCommand);
|
||||
|
||||
var cmd = db.CreateCommand(connection, CommandType.Text, "");
|
||||
var prefix = db.DatabaseType.GetParameterPrefix(cmd.Connection.ConnectionString);
|
||||
var pocoValues = new List<string>();
|
||||
var index = 0;
|
||||
foreach (var poco in itemsForTrans)
|
||||
var command = database.CreateCommand(database.Connection, CommandType.Text, string.Empty);
|
||||
var parameterIndex = 0;
|
||||
var commandRecords = Math.Min(recordsPerCommand, recordsLeftToInsert);
|
||||
var recordsValues = new string[commandRecords];
|
||||
for (var commandRecordIndex = 0; commandRecordIndex < commandRecords; commandRecordIndex++, recordsIndex++, recordsLeftToInsert--)
|
||||
{
|
||||
var values = new List<string>();
|
||||
foreach (var column in columns)
|
||||
var record = records[recordsIndex];
|
||||
var recordValues = new string[columns.Length];
|
||||
for (var columnIndex = 0; columnIndex < columns.Length; columnIndex++)
|
||||
{
|
||||
db.AddParameter(cmd, column.Value.GetValue(poco));
|
||||
values.Add(prefix + index++);
|
||||
database.AddParameter(command, columns[columnIndex].Value.GetValue(record));
|
||||
recordValues[columnIndex] = prefix + parameterIndex++;
|
||||
}
|
||||
pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
|
||||
recordsValues[commandRecordIndex] = "(" + string.Join(",", recordValues) + ")";
|
||||
}
|
||||
|
||||
sql[commandIndex] = $"INSERT INTO {tableName} ({columnNames}) VALUES {string.Join(", ", pocoValues)}";
|
||||
commands[commandIndex] = cmd;
|
||||
command.CommandText = $"INSERT INTO {tableName} ({columnNames}) VALUES {string.Join(", ", recordsValues)}";
|
||||
commands[commandIndex] = command;
|
||||
}
|
||||
|
||||
return commands;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Determines whether a column should be part of a bulk-insert.
|
||||
/// </summary>
|
||||
/// <param name="pocoData">The PocoData object corresponding to the record's type.</param>
|
||||
/// <param name="column">The column.</param>
|
||||
/// <returns>A value indicating whether the column should be part of the bulk-insert.</returns>
|
||||
/// <remarks>Columns that are primary keys and auto-incremental, or result columns, are excluded from bulk-inserts.</remarks>
|
||||
private static bool IncludeColumn(PocoData pocoData, KeyValuePair<string, PocoColumn> column)
|
||||
{
|
||||
return column.Value.ResultColumn == false
|
||||
&& (pocoData.TableInfo.AutoIncrement == false || column.Key != pocoData.TableInfo.PrimaryKey);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk-insert records using SqlCE TableDirect method.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="pocoData">The PocoData object corresponding to the record's type.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <returns>The number of records that were inserted.</returns>
|
||||
internal static int BulkInsertRecordsSqlCe<T>(Database database, PocoData pocoData, IEnumerable<T> records)
|
||||
{
|
||||
var columns = pocoData.Columns.ToArray();
|
||||
|
||||
// create command against the original database.Connection
|
||||
using (var command = database.CreateCommand(database.Connection, CommandType.TableDirect, string.Empty))
|
||||
{
|
||||
command.CommandText = pocoData.TableInfo.TableName;
|
||||
// fixme - not supporting transactions?
|
||||
//cmd.Transaction = GetTypedTransaction<SqlCeTransaction>(db.Connection.);
|
||||
|
||||
// fixme - why the double 'using' here?
|
||||
var count = 0;
|
||||
using (var tCommand = GetTypedCommand<SqlCeCommand>(command)) // execute on the real command
|
||||
{
|
||||
// seems to cause problems, I think this is primarily used for retrieval, not inserting.
|
||||
// see: https://msdn.microsoft.com/en-us/library/system.data.sqlserverce.sqlcecommand.indexname%28v=vs.100%29.aspx?f=255&MSPPError=-2147217396
|
||||
//sqlCeCommand.IndexName = pd.TableInfo.PrimaryKey;
|
||||
|
||||
using (var resultSet = tCommand.ExecuteResultSet(ResultSetOptions.Updatable))
|
||||
{
|
||||
var updatableRecord = resultSet.CreateRecord();
|
||||
foreach (var record in records)
|
||||
{
|
||||
for (var i = 0; i < columns.Length; i++)
|
||||
{
|
||||
// skip the index if this shouldn't be included (i.e. PK)
|
||||
if (IncludeColumn(pocoData, columns[i]))
|
||||
{
|
||||
var val = columns[i].Value.GetValue(record);
|
||||
updatableRecord.SetValue(i, val);
|
||||
}
|
||||
}
|
||||
resultSet.Insert(updatableRecord);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bulk-insert records using SqlServer BulkCopy method.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the records.</typeparam>
|
||||
/// <param name="database">The database.</param>
|
||||
/// <param name="pocoData">The PocoData object corresponding to the record's type.</param>
|
||||
/// <param name="records">The records.</param>
|
||||
/// <returns>The number of records that were inserted.</returns>
|
||||
internal static int BulkInsertRecordsSqlServer<T>(Database database, PocoData pocoData, IEnumerable<T> records)
|
||||
{
|
||||
// create command against the original database.Connection
|
||||
using (var command = database.CreateCommand(database.Connection, CommandType.Text, string.Empty))
|
||||
{
|
||||
// use typed connection and transactionf or SqlBulkCopy
|
||||
var tConnection = GetTypedConnection<SqlConnection>(database.Connection);
|
||||
var tTransaction = GetTypedTransaction<SqlTransaction>(command.Transaction);
|
||||
var tableName = pocoData.TableInfo.TableName;
|
||||
|
||||
var umbracoDatabase = database as UmbracoDatabase;
|
||||
if (umbracoDatabase == null) throw new NotSupportedException("Database must be UmbracoDatabase.");
|
||||
var syntax = umbracoDatabase.SqlSyntax as SqlServerSyntaxProvider;
|
||||
if (syntax == null) throw new NotSupportedException("SqlSyntax must be SqlServerSyntaxProvider.");
|
||||
|
||||
using (var copy = new SqlBulkCopy(tConnection, SqlBulkCopyOptions.Default, tTransaction) { BulkCopyTimeout = 10000, DestinationTableName = tableName })
|
||||
using (var bulkReader = new PocoDataDataReader<T, SqlServerSyntaxProvider>(records, pocoData, syntax))
|
||||
{
|
||||
copy.WriteToServer(bulkReader);
|
||||
return bulkReader.RecordsAffected;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the underlying connection as a typed connection - this is used to unwrap the profiled mini profiler stuff
|
||||
/// </summary>
|
||||
/// <typeparam name="TConnection"></typeparam>
|
||||
/// <param name="connection"></param>
|
||||
/// <returns></returns>
|
||||
private static TConnection GetTypedConnection<TConnection>(IDbConnection connection)
|
||||
where TConnection : class, IDbConnection
|
||||
{
|
||||
var profiled = connection as ProfiledDbConnection;
|
||||
return profiled == null ? connection as TConnection : profiled.InnerConnection as TConnection;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the underlying transaction as a typed transaction - this is used to unwrap the profiled mini profiler stuff
|
||||
/// </summary>
|
||||
/// <typeparam name="TTransaction"></typeparam>
|
||||
/// <param name="transaction"></param>
|
||||
/// <returns></returns>
|
||||
private static TTransaction GetTypedTransaction<TTransaction>(IDbTransaction transaction)
|
||||
where TTransaction : class, IDbTransaction
|
||||
{
|
||||
var profiled = transaction as ProfiledDbTransaction;
|
||||
return profiled == null ? transaction as TTransaction : profiled.WrappedTransaction as TTransaction;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the underlying command as a typed command - this is used to unwrap the profiled mini profiler stuff
|
||||
/// </summary>
|
||||
/// <typeparam name="TCommand"></typeparam>
|
||||
/// <param name="command"></param>
|
||||
/// <returns></returns>
|
||||
private static TCommand GetTypedCommand<TCommand>(IDbCommand command)
|
||||
where TCommand : class, IDbCommand
|
||||
{
|
||||
var profiled = command as ProfiledDbCommand;
|
||||
return profiled == null ? command as TCommand : profiled.InternalCommand as TCommand;
|
||||
}
|
||||
|
||||
public static void TruncateTable(this IDatabase db, ISqlSyntaxProvider sqlSyntax, string tableName)
|
||||
{
|
||||
var sql = new Sql(string.Format(
|
||||
|
||||
Reference in New Issue
Block a user