2014-10-27 74 views
0

我有一個c#應用程序正在從一個數據庫中取數據,進行必要的轉換,並將數據插入到另一個數據庫的表中。我通過將我的源數據插入到隊列中,然後處理隊列以將數據插入到目標表中來完成此操作。我有兩個獨立的線程來讀取源數據並寫入目標數據。閱讀線程的運行速度比寫入線程快得多,所以我的隊列填滿很快。從ConcurrentQueue插入多行到SQL Server表中

正如您在閱讀線程中所看到的,我使用SqlCommand.ExecuteReader()來讀取數據。然後我循環遍歷隊列併爲每一行分別執行INSERT語句。我想象的是,不是逐行插入,而是做某種(可能是Linq語句)。有沒有人有任何想法如何更快地插入我的插入?

隊列定義:

private static readonly BlockingCollection<HistorianData> ValueQueue = new BlockingCollection<HistorianData>(new ConcurrentQueue<HistorianData>(), 1000000); 

閱讀:

public static void EnqueueHistorianData(SqlConnection connection, int idToAdd, DateTime minDatetime, DateTime maxDateTime, string cluster, string dbName, string dataTable, string idTable, string mainIdColumn, string foreignIdColumn, string dateColumn, string nameColumn, string valueColumn) 
    { 
     StringBuilder select = new StringBuilder(); 
     HistorianData values; 

     select.Append(String.Format("SELECT {0}.{1},", dataTable, dateColumn)); 
     select.Append(String.Format(" '{0}.' + {1}.{2},", cluster, idTable, nameColumn)); 
     select.Append(String.Format(" {0}.{1}", dataTable, valueColumn)); 
     select.Append(String.Format(" FROM {0}.{1}", dbName, dataTable)); 
     select.Append(String.Format(" JOIN {0}.{1}", dbName, idTable)); 
     select.Append(String.Format(" ON {0}.{1} = {2}.{3}", dataTable, foreignIdColumn, idTable, mainIdColumn)); 
     select.Append(String.Format(" INNER JOIN Runtime.dbo.Tag")); 
     select.Append(String.Format(" ON Runtime.dbo.Tag.TagName = '{0}.' + {1}.{2}", cluster, idTable, nameColumn)); 
     select.Append(String.Format(" WHERE {0}.{1} >= '{2}'", dataTable, dateColumn, minDatetime.ToString())); 
     select.Append(String.Format(" AND {0}.{1} = {2}", dataTable, foreignIdColumn, idToAdd.ToString())); 
     select.Append(String.Format(" AND {0}.{1} >= '{2}'", dataTable, dateColumn, minDatetime.ToString())); 
     select.Append(String.Format(" AND {0}.{1} < '{2}'", dataTable, dateColumn, maxDateTime.ToString())); 

     using (var command = new SqlCommand(select.ToString(), connection)) 
     { 
      command.CommandTimeout = 1000; 

      using (var reader = command.ExecuteReader()) 
      { 
       if (reader.HasRows) 
       { 
        while (reader.Read()) 
        { 
         values = new HistorianData(); 

         values.SampleDate = reader.GetDateTime(0); 
         values.TagName = reader.GetString(1); 
         values.TagValue = reader.GetDouble(2); 

         ValueQueue.Add(values); 
        } 

        values = null; 
        reader.Close(); 
       } 
      } 
     } 
    } 

寫作:

public static void WriteQueueValuesToHistorian(string connectionString) 
    { 
     HistorianData values; 

     using (var connection = new SqlConnection(connectionString)) 
     { 
      connection.Open(); 

      using (SqlCommand insertCommand = connection.CreateCommand()) 
      { 
       insertCommand.CommandType = CommandType.Text; 
       insertCommand.CommandText = "INSERT INTO Runtime.dbo.History (DateTime, TagName, Value, QualityDetail) VALUES (@P1, @P2, @P3, 192)"; 
       insertCommand.CommandTimeout = 1000; 

       var param1 = new SqlParameter("@P1", SqlDbType.DateTime); 
       insertCommand.Parameters.Add(param1); 

       var param2 = new SqlParameter("@P2", SqlDbType.NVarChar, 512); 
       insertCommand.Parameters.Add(param2); 

       var param3 = new SqlParameter("@P3", SqlDbType.Float); 
       insertCommand.Parameters.Add(param3); 

       insertCommand.Prepare(); 

       while (!ValueQueue.IsCompleted && ValueQueue.TryTake(out values, System.Threading.Timeout.Infinite)) 
       { 
        int retries = 0; 

        while (retries < 3) 
        { 
         insertCommand.Parameters["@P1"].Value = values.SampleDate.ToLocalTime(); 
         insertCommand.Parameters["@P2"].Value = values.TagName; 
         insertCommand.Parameters["@P3"].Value = values.TagValue; 

         try 
         { 
          insertCommand.ExecuteNonQuery(); 
          retries = 4; 
         } 
         catch (SqlException) 
         { 
          retries += 1; 
          sw.WriteLine("SQLException - Values: " + insertCommand.Parameters["@P1"].Value + ", " + insertCommand.Parameters["@P2"].Value + ", " + insertCommand.Parameters["@P3"].Value); 
         } 
        } 
       } 
      } 
     } 
    } 
+0

的[SqlBulkCopy的](http://msdn.microsoft.com/en-us/library/system.data.sqlclient.sqlbulkcopy(V = vs.110)的.aspx)類確實具有優化從讀取內置的SqlDataReader中讀取。而不是將記錄讀取到併發隊列中,然後將它們寫回來,是否可以在第一個函數的'reader'上使用SqlBulkCopy? – 2014-10-27 15:10:26

+0

我試着走這條路。我的目標數據庫實際上是專有的,並通過使用定製OLEDB接口的鏈接服務器進行引用。批量插入不適用於鏈接的服務器。 – clintperry 2014-10-27 16:10:07

+0

連接字符串是否都轉到同一個SQL服務器上用於這兩種功能(鏈接服務器是否與源數據庫在同一臺服務器上鍊接)?如果是這樣,爲什麼你完全通過C#,爲什麼不直接在單個查詢中查詢呢? – 2014-10-27 16:26:19

回答

0

您可以從您檢索數據的DataTable,並使用SqlBulkCopy的方法。請檢查以下網址;

http://msdn.microsoft.com/en-us/library/ex21zs8x.aspx

+0

我試着走這條路。我的目標數據庫實際上是專有的,並通過使用定製OLEDB接口的鏈接服務器進行引用。批量插入不適用於鏈接的服務器。 – clintperry 2014-10-27 16:11:05

+0

使用2個不同的連接字符串與DatabaseHelper類怎麼樣?如果這不起作用,那麼你也可以嘗試BulkInsert方法:http://msdn.microsoft.com/en-us/library/ms175915.aspx – Hozikimaru 2014-10-27 16:13:32