2011-11-29 111 views
2

我有一個數據庫同步任務需要一些時間來處理,因爲在120k葉子記錄的區域中,但是它們是遠程的,訪問相對較慢。使用線程隊列管理長時間運行的數據處理任務

目前,我的應用程序做的

  1. 所有本地聯繫人
  2. 對於每個本地聯繫人獲取列表中的相當幼稚的過程中,讓所有的相關數據
  3. 然後得到相應的遠程接觸
  4. 比較這兩個並做的東西,使他們同步

步驟1返回數據之前,它完成,和步驟4不涉及同一組中不同聯繫人之間的比較。

我希望做的是使用某種排隊構造,並開始在步驟1中填充它,然後立即進入步驟2並開始使用多線程處理它們進來的項目。

然後,該過程變爲:

  1. 開始填充隊列與聯繫人
  2. 雖然有在隊列
  3. 啓動一個線程和項目:
    1. 以從隊列前接觸
    2. 取回遠程聯繫人
    3. 將它們比較
    4. 執行所需的更新

我是在說我可以創建一個新的ConcurrentQueue假設是正確的,開始填充它,然後遍歷它,因爲我可能單線程簡單集合?

(我不把任何錯誤檢查或實際線程,以保持例子簡單)

class Program 
{ 
    static void Main(string[] args) 
    { 
     Processor p = new Processor(); 
     p.Process(); 
    } 
} 


class Processor 
{ 
    bool FetchComplete = false; 
    ConcurrentQueue<Contact> q = new ConcurrentQueue<Contact>(); 

    public void Process() 
    { 
     this.PopulateQueue(); // this will be fired off using QueueUserWorkItem for example 

     while (FetchComplete == false) 
     { 
      if (q.Count > 0) 
      { 
       Contact contact; 
       q.TryDequeue(out contact); 
       ProcessContact(contact); // this will also be in QueueUserWorkItem 
      } 
     } 
    } 


    // a long running process that fills the queue with Contacts 
    private void PopulateQueue() 
    { 
     this.FetchComplete = false; 
     // foreach contact in database 
     Contact contact = new Contact(); // contact will come from DB 
     this.q.Enqueue(contact); 
     // end foreach 

     this.FetchComplete = true; 
    } 

    private void ProcessContact(Contact contact) 
    { 
     // do magic with contact 
    } 
} 
+0

你有沒有考慮過ThreadPool?您可以創建一個名爲Job的類,然後定義每個聯繫人的作業。然後,您可以創建一個處理作業列表的JobManager。作業將保留在具有固定數量線程的ThreadPool中。 –

回答

2

你可能會使用的BlockingCollection代替ConcurrentQueue會更好。原因是前者會阻止線程調用Take,直到項目出現在隊列中。當處理Contract實例的線程清除隊列時,這在提取線程全部檢索完畢之前會很有用。

一般而言,您的策略非常穩固。我用它所有的時間。它通常被稱爲生產者 - 消費者模式。當處理涉及多於兩個階段時,則稱爲管線模式。在這種情況下,你會有2個或更多的隊列,而不是典型的隊列。您可以想象每個階段通過另一個隊列將工作項目轉發到下一個階段的場景。

+1

另一個新集合:)我可能會使用它,而不是隊列。 – Cylindric

相關問題