2014-09-05 77 views
0

使用案例是:我有一個巨大的日誌文件,我正在通過塊(等大小,IO讀取)在主線程塊上讀取。每個讀取的塊大概在我的測試機器中需要1秒。讀完每個塊後,我使用一個線程池爲每個塊創建一個線程,將其放入2個數據庫實例中。現在我有2個挑戰:多線程完成時間測量

  1. 我必須交替地插入2個DBS塊。即奇數塊去第一個DB,甚至大塊去第二個DB。我在塊模型中沒有任何東西來表示我可以依賴的塊的數量。我試圖在該塊模型上創建一個包裝器,使其具有「chunkCount」,但我在哪裏增加chunkCount?

  2. 我該如何測量每個將從線程池運行在不同線程上的插入的時間?

下面的代碼我試圖實驗的基礎上,但它不會產生任何結果:

logEventsChunk = logFetcher.GetNextLogEventsChunk(); 
      chunkModel = new LogEventChunkModel(); 
      stw = new Stopwatch(); 
      chunkModel.ChunkCount = chunkCount; 
      chunkModel.LogeventChunk = logEventsChunk; 


      //chunkCount++; 
      ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state) 
      { InsertChunk(chunkModel, collection, secondCollection, stw); }), null); 

的InsertChunk方法是在這裏:

private void InsertChunk(LogEventChunkModel logEventsChunk, MongoCollection<LogEvent> collection, MongoCollection<LogEvent> secondCollection,Stopwatch stw) 
    { 
     chunkCount++; 
     stw.Start(); 
     MongoInsertOptions options = new MongoInsertOptions(); 
     options.WriteConcern = WriteConcern.Unacknowledged; 
     options.CheckElementNames = true; 
     string db = string.Empty; 
     { 
      //DateTime dtWrite = DateTime.Now; 
      if (logEventsChunk.ChunkCount % 2 == 0) 
      { 
       DateTime dtWrite1 = DateTime.Now; 
       collection.InsertBatch(logEventsChunk.LogeventChunk.LogEvents, options); 
       db = "FirstDB"; 
       //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite1).TotalSeconds.ToString() + " s. " + db); 
      } 
      else 
      { 
       DateTime dtWrite2 = DateTime.Now; 
       secondCollection.InsertBatch(logEventsChunk.LogeventChunk.LogEvents, options); 
       db = "SecondDB"; 
       //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite2).TotalSeconds.ToString() + " s. " + db); 
      } 
      Console.WriteLine("Thread Completed: {0} **********", Thread.CurrentThread.GetHashCode()); 
      stw.Stop(); 
      Console.WriteLine("Time taken to write the chunk: " + stw.ElapsedMilliseconds + " ms. " + db + " Chunk Count: " + logEventsChunk.ChunkCount); 
      stw.Reset(); 

      //+ "Chunk Count: " + chunkCount.ToString() 
      //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite).TotalSeconds.ToString() + " s. "+db); 
      //mongoDBInsertionTotalTime += DateTime.Now.Subtract(dtWrite).TotalSeconds; 
     }    
    } 

請忽略這些註釋行,因爲它們是隻是一些實驗的一部分。

回答

1

與其爲每個插入啓動一個新線程,並試圖讓線程確定要寫入哪個數據庫,啓動兩個持久線程,每個持久線程寫入單個數據庫。那些線程從隊列中獲取數據。這是一個使用BlockingCollection<T>的非常標準的生產者/消費者設置。

所以,你必須:

// Maximum number of items in queue (to avoid out of memory errors) 
const int MaxQueueSize = 10000; 
BlockingCollection<LogEventChunkModel> Db1Queue = new BlockingCollection<LogEventChunkModel>(MaxQueueSize); 
BlockingCollection<LogEventChunkModel> Db2Queue = new BlockingCollection<LogEventChunkModel>(MaxQueueSize); 

在你的主線程,啓動數據庫更新線程:

var t1 = new Thread(DbWriteThreadProc); 
t1.Start(new Tuple<string, BlockingCollection<LogEventChunkModel>>("FirstDB", Db1Queue)); 

var t2 = new Thread(DbWriteThreadProc); 
t2.Start(new Tuple<string, BlockingCollection<LogEventChunkModel>>("SecondDb", Db2Queue)); 

然後,開始讀取日誌文件,並把備用塊入隊列:

int chunk = 0; 
while (!EndOfLogFile) 
{ 
    var chunk = GetNextChunk(); 
    if ((chunk % 0) == 0) 
     Db1Queue.Add(chunk); 
    else 
     Db2Queue.Add(chunk); 
    ++chunk; 
} 

// end of data, so mark the queues as complete 
Db1Queue.CompleteAdding(); 
Db2Queue.CompleteAdding(); 

// and wait for threads to complete processing the queues 
t1.Join(); 
t2.Join(); 

你寫的線程過程非常簡單。它所做的就是提供服務的隊列,並寫入數據庫:

void DbWriteThreadProc(object state) 
{ 
    // passed object is a Tuple<string, BlockingCollection> 
    // Get the items from it 
    var threadData = (Tuple<string, BlockingCollection>)state; 
    string dbName = threadData.Item1; 
    BlockingCollection<LogEventChunk> queue = threadData.Item2; 

    // now read the queue and write to the database 
    foreach (var chunk in queue.GetConsumingEnumerable()) 
    { 
     var sw = Stopwatch.StartNew(); 
     // write chunk to the database. 
     sw.Stop(); 
     Console.WriteLine("Time to write = {0:N0} ms", sw.ElapsedMilliseconds); 
    } 
} 

GetConsumingEnumerable確實非忙等待隊列,所以它不是不斷地輪詢。當隊列爲空時,循環將完成隊列被標記爲完成添加(這就是主線程調用CompleteAdding的原因)。

這種方法比你有什麼好處。特別是,它簡化了確定哪些數據庫塊被寫入。另外,它最多使用三個線程,並保證按照與從日誌文件中讀取的順序相同的順序將數據塊添加到數據庫中。您使用QueueUserWorkItem的方法不能保證廣告訂單。它還會爲每個插入創建一個新線程,最終可能會有大量的併發線程。