好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)

在之前只知道SqlServer支持数据批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法。

首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。

?

/// <summary>

   /// 提供数据批量处理的方法。

   /// </summary>

   public interface IBatcherProvider : IProviderService

   {

     /// <summary>

     /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

     /// </summary>

     /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

     /// <param name="batchSize">每批次写入的数据量。</param>

     void Insert(DataTable dataTable, int batchSize = 10000);

   }

一、SqlServer数据批量插入

SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:

?

/// <summary>

   /// 为 System.Data.SqlClient 提供的用于批量操作的方法。

   /// </summary>

   public sealed class MsSqlBatcher : IBatcherProvider

   {

     /// <summary>

     /// 获取或设置提供者服务的上下文。

     /// </summary>

     public ServiceContext ServiceContext { get ; set ; }

 

     /// <summary>

     /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

     /// </summary>

     /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

     /// <param name="batchSize">每批次写入的数据量。</param>

     public void Insert(DataTable dataTable, int batchSize = 10000)

     {

       Checker.ArgumentNull(dataTable, "dataTable" );

       if (dataTable.Rows.Count == 0)

       {

         return ;

       }

       using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection())

       {

         try

         {

           connection.TryOpen();

           //给表名加上前后导符

           var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);

           using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null )

             {

               DestinationTableName = tableName,

               BatchSize = batchSize

             })

           {

             //循环所有列,为bulk添加映射

             dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);

             bulk.WriteToServer(dataTable);

             bulk.Close();

           }

         }

         catch (Exception exp)

         {

           throw new BatcherException(exp);

         }

         finally

         {

           connection.TryClose();

         }

       }

     }

   }

以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。

二、Oracle数据批量插入

System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。

?

/// <summary>

   /// Oracle.Data.Access 组件提供的用于批量操作的方法。

   /// </summary>

   public sealed class OracleAccessBatcher : IBatcherProvider

   {

     /// <summary>

     /// 获取或设置提供者服务的上下文。

     /// </summary>

     public ServiceContext ServiceContext { get ; set ; }

 

     /// <summary>

     /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

     /// </summary>

     /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

     /// <param name="batchSize">每批次写入的数据量。</param>

     public void Insert(DataTable dataTable, int batchSize = 10000)

     {

       Checker.ArgumentNull(dataTable, "dataTable" );

       if (dataTable.Rows.Count == 0)

       {

         return ;

       }

       using (var connection = ServiceContext.Database.CreateConnection())

       {

         try

         {

           connection.TryOpen();

           using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

           {

             if (command == null )

             {

               throw new BatcherException( new ArgumentException( "command" ));

             }

             command.Connection = connection;

             command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);

             command.ExecuteNonQuery();

           }

         }

         catch (Exception exp)

         {

           throw new BatcherException(exp);

         }

         finally

         {

           connection.TryClose();

         }

       }

     }

 

     /// <summary>

     /// 生成插入数据的sql语句。

     /// </summary>

     /// <param name="database"></param>

     /// <param name="command"></param>

     /// <param name="table"></param>

     /// <returns></returns>

     private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)

     {

       var names = new StringBuilder();

       var values = new StringBuilder();

       //将一个DataTable的数据转换为数组的数组

       var data = table.ToArray();

 

       //设置ArrayBindCount属性

       command.GetType().GetProperty( "ArrayBindCount" ).SetValue(command, table.Rows.Count, null );

 

       var syntax = database.Provider.GetService<ISyntaxProvider>();

       for (var i = 0; i < table.Columns.Count; i++)

       {

         var column = table.Columns[i];

 

         var parameter = database.Provider.DbProviderFactory.CreateParameter();

         if (parameter == null )

         {

           continue ;

         }

         parameter.ParameterName = column.ColumnName;

         parameter.Direction = ParameterDirection.Input;

         parameter.DbType = column.DataType.GetDbType();

         parameter.Value = data[i];

 

         if (names.Length > 0)

         {

           names.Append( "," );

           values.Append( "," );

         }

         names.AppendFormat( "{0}" , DbUtility.FormatByQuote(syntax, column.ColumnName));

         values.AppendFormat( "{0}{1}" , syntax.ParameterPrefix, column.ColumnName);

 

         command.Parameters.Add(parameter);

       }

       return string .Format( "INSERT INTO {0}({1}) VALUES ({2})" , DbUtility.FormatByQuote(syntax, table.TableName), names, values);

     }

   }

 以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。

三、SQLite数据批量插入

SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。

?

public sealed class SQLiteBatcher : IBatcherProvider

   {

     /// <summary>

     /// 获取或设置提供者服务的上下文。

     /// </summary>

     public ServiceContext ServiceContext { get ; set ; }

 

     /// <summary>

     /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

     /// </summary>

     /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

     /// <param name="batchSize">每批次写入的数据量。</param>

     public void Insert(DataTable dataTable, int batchSize = 10000)

     {

       Checker.ArgumentNull(dataTable, "dataTable" );

       if (dataTable.Rows.Count == 0)

       {

         return ;

       }

       using (var connection = ServiceContext.Database.CreateConnection())

       {

         DbTransaction transcation = null ;

         try

         {

           connection.TryOpen();

           transcation = connection.BeginTransaction();

           using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

           {

             if (command == null )

             {

               throw new BatcherException( new ArgumentException( "command" ));

             }

             command.Connection = connection;

 

             command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);

             if (command.CommandText == string .Empty)

             {

               return ;

             }

 

             var flag = new AssertFlag();

             dataTable.EachRow(row =>

               {

                 var first = flag.AssertTrue();

                 ProcessCommandParameters(dataTable, command, row, first);

                 command.ExecuteNonQuery();

               });

           }

           transcation.Commit();

         }

         catch (Exception exp)

         {

           if (transcation != null )

           {

             transcation.Rollback();

           }

           throw new BatcherException(exp);

         }

         finally

         {

           connection.TryClose();

         }

       }

     }

 

     private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first)

     {

       for (var c = 0; c < dataTable.Columns.Count; c++)

       {

         DbParameter parameter;

         //首次创建参数,是为了使用缓存

         if (first)

         {

           parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();

           parameter.ParameterName = dataTable.Columns[c].ColumnName;

           command.Parameters.Add(parameter);

         }

         else

         {

           parameter = command.Parameters[c];

         }

         parameter.Value = row[c];

       }

     }

 

     /// <summary>

     /// 生成插入数据的sql语句。

     /// </summary>

     /// <param name="database"></param>

     /// <param name="table"></param>

     /// <returns></returns>

     private string GenerateInserSql(IDatabase database, DataTable table)

     {

       var syntax = database.Provider.GetService<ISyntaxProvider>();

       var names = new StringBuilder();

       var values = new StringBuilder();

       var flag = new AssertFlag();

       table.EachColumn(column =>

         {

           if (!flag.AssertTrue())

           {

             names.Append( "," );

             values.Append( "," );

           }

           names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));

           values.AppendFormat( "{0}{1}" , syntax.ParameterPrefix, column.ColumnName);

         });

       return string .Format( "INSERT INTO {0}({1}) VALUES ({2})" , DbUtility.FormatByQuote(syntax, table.TableName), names, values);

     }

   }

四、MySql数据批量插入

?

/// <summary>

   /// 为 MySql.Data 组件提供的用于批量操作的方法。

   /// </summary>

   public sealed class MySqlBatcher : IBatcherProvider

   {

     /// <summary>

     /// 获取或设置提供者服务的上下文。

     /// </summary>

     public ServiceContext ServiceContext { get ; set ; }

 

     /// <summary>

     /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。

     /// </summary>

     /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>

     /// <param name="batchSize">每批次写入的数据量。</param>

     public void Insert(DataTable dataTable, int batchSize = 10000)

     {

       Checker.ArgumentNull(dataTable, "dataTable" );

       if (dataTable.Rows.Count == 0)

       {

         return ;

       }

       using (var connection = ServiceContext.Database.CreateConnection())

       {

         try

         {

           connection.TryOpen();

           using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())

           {

             if (command == null )

             {

               throw new BatcherException( new ArgumentException( "command" ));

             }

             command.Connection = connection;

 

             command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);

             if (command.CommandText == string .Empty)

             {

               return ;

             }

             command.ExecuteNonQuery();

           }

         }

         catch (Exception exp)

         {

           throw new BatcherException(exp);

         }

         finally

         {

           connection.TryClose();

         }

       }

     }

 

     /// <summary>

     /// 生成插入数据的sql语句。

     /// </summary>

     /// <param name="database"></param>

     /// <param name="command"></param>

     /// <param name="table"></param>

     /// <returns></returns>

     private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)

     {

       var names = new StringBuilder();

       var values = new StringBuilder();

       var types = new List<DbType>();

       var count = table.Columns.Count;

       var syntax = database.Provider.GetService<ISyntaxProvider>();

       table.EachColumn(c =>

         {

           if (names.Length > 0)

           {

             names.Append( "," );

           }

           names.AppendFormat( "{0}" , DbUtility.FormatByQuote(syntax, c.ColumnName));

           types.Add(c.DataType.GetDbType());

         });

 

       var i = 0;

       foreach (DataRow row in table.Rows)

       {

         if (i > 0)

         {

           values.Append( "," );

         }

         values.Append( "(" );

         for (var j = 0; j < count; j++)

         {

           if (j > 0)

           {

             values.Append( ", " );

           }

           var isStrType = IsStringType(types[j]);

           var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);

           if (parameter != null )

           {

             values.Append(parameter.ParameterName);

             command.Parameters.Add(parameter);

           }

           else if (isStrType)

           {

             values.AppendFormat( "'{0}'" , row[j]);

           }

           else

           {

             values.Append(row[j]);

           }

         }

         values.Append( ")" );

         i++;

       }

       return string .Format( "INSERT INTO {0}({1}) VALUES {2}" , DbUtility.FormatByQuote(syntax, table.TableName), names, values);

     }

 

     /// <summary>

     /// 判断是否为字符串类别。

     /// </summary>

     /// <param name="dbType"></param>

     /// <returns></returns>

     private bool IsStringType(DbType dbType)

     {

       return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;

     }

 

     /// <summary>

     /// 创建参数。

     /// </summary>

     /// <param name="provider"></param>

     /// <param name="isStrType"></param>

     /// <param name="dbType"></param>

     /// <param name="value"></param>

     /// <param name="parPrefix"></param>

     /// <param name="row"></param>

     /// <param name="col"></param>

     /// <returns></returns>

     private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col)

     {

       //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数

       if ((isStrType && value.ToString().IndexOf( '\'' ) != -1) || dbType == DbType.DateTime)

       {

         var name = string .Format( "{0}p_{1}_{2}" , parPrefix, row, col);

         var parameter = provider.DbProviderFactory.CreateParameter();

         parameter.ParameterName = name;

         parameter.Direction = ParameterDirection.Input;

         parameter.DbType = dbType;

         parameter.Value = value;

         return parameter;

       }

       return null ;

     }

   }

MySql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

五、测试

接下来写一个测试用例来看一下使用批量插入的效果。    

?

public void TestBatchInsert()

    {

      Console.WriteLine(TimeWatcher.Watch(() =>

        InvokeTest(database =>

          {

            var table = new DataTable( "Batcher" );

            table.Columns.Add( "Id" , typeof ( int ));

            table.Columns.Add( "Name1" , typeof ( string ));

            table.Columns.Add( "Name2" , typeof ( string ));

            table.Columns.Add( "Name3" , typeof ( string ));

            table.Columns.Add( "Name4" , typeof ( string ));

 

            //构造100000条数据

            for (var i = 0; i < 100000; i++)

            {

              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());

            }

 

            //获取 IBatcherProvider

            var batcher = database.Provider.GetService<IBatcherProvider>();

            if (batcher == null )

            {

              Console.WriteLine( "不支持批量插入。" );

            }

            else

            {

              batcher.Insert(table);

            }

 

            //输出batcher表的数据量

            var sql = new SqlCommand( "SELECT COUNT(1) FROM Batcher" );

            Console.WriteLine( "当前共有 {0} 条数据" , database.ExecuteScalar(sql));

 

          })));

    }

以下表中列出了四种数据库生成10万条数据各耗用的时间

 

数据库

耗用时间

MsSql 00:00:02.9376300 Oracle 00:00:01.5155959 SQLite 00:00:01.6275634 MySql 00:00:05.4166891

 

dy("nrwz");

查看更多关于c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)的详细内容...

  阅读:59次