2013-03-02 84 views
4

我試圖找到一種方法來處理一個隊列中的幾個線程,動態調整消費者的數量。基本上這個任務非常有名:多個生產者創建消息並將它們提交到隊列中,多個消費者處理來自隊列的消息。現在,我想到了使用像System.Collections.Queue.Synchronized,System.Collections.Concurrent.ConcurrentQueue和System.Collections.Concurrent.BlockingCollection這樣的不同組件的不同方法,但我無法決定如何正確地使用它最大效率,所以我會很高興通過您的意見收到一些明智的想法。
這裏有更多的細節:一個隊列的並行處理

  • 消息率預計將在某些場合真的密集,但處理將是比較直接的;
  • 我不知道應該有多少消費者;
  • 我希望進程調整當前消費者的數量,而不是讓他們被阻止,具體取決於消息排隊的數量(意思是我想爲每百個消息填充額外的消費者fe,並且消費者應該如果入隊消息的數量比填充消息所需的數量少50,則停止第三個消費者將在消息數量增長超過300時填充,並且當它下降到250時它應該停止)。

這是主意。現在,我想將ConcurrentQueue封裝到一個封裝了Enqueue方法的類中,並將檢查排隊後的消息數量,並將決定是否啓動額外的消費者。消費者應該在循環內檢查一下應該做出停止的決定。我認爲你會提出一些更有趣的解決方案。

順便說一句,我仍然不知道如何處理的一種情況理論上是當最後一條消息被排隊並且最後一位消費者已經停止的時候。另一種情況也是停滯 - 如果同一時間停止檢查,一些消費者會被停止。我應該如何處理這些情況?

爲了證明這是什麼意思,看看這個例子:

class MessageController 
{ 
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>(); 

    int amountOfConsumers; 

    public void Enqueue(IMessage message) 
    { 
     messageQueue.Add(message); // point two 

     if (Math.Floor((double)messageQueue.Count/100)+1 > amountOfConsumers) // point three 
     { 
      Task.Factory.StartNew(() => 
      { 
       IMessage msg; 
       while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50)/100)) + 1 >= amountOfConsumers)) //point one 
       { 
        msg = messageQueue.Take(); 
        //process msg... 
       } 

       ConsumerQuit(); // point four 
      }); 

      Interlocked.Increment(ref amountOfConsumers); 
     } 
    } 

    public void ConsumerQuit() 
    { 
     Interlocked.Decrement(ref amountOfConsumers); 
    } 
} 

所以現在當我可以指向特定的代碼行這些問題:

  • 當最後消費者發現,沒有消息入隊(@point one),並且在它調用ConsumerQuit方法之前,最後一條消息到達並排入隊列,然後檢查其他消費者是否已完成,結果(@point三)仍然是消費者工作,並且因爲單一消息的一個消費者不止是enou gh - 什麼都沒有發生,那麼ConsumerQuit終於被調用了,並且我有一條消息停留在隊列中。
ConsumerTask      | LastMessageThread 
------------------------------------------------------ 
@point one(messageQueue.Count=0) | @point two 
no time       | @point three(amountOfConsumers=1) 
@point four      | ended; 
ended;        | ended; 
  • 了幾位消費者到了「一點一點」同時檢查時,其中一人應當停止(FE messageQueue.Count爲249),其中幾人將之前因爲停止ConsumerQuit將被召喚其中一個人,其他人也會做這個檢查。
ConsumerTask1     | ConsumerTask2| ConsumerTask3 | ConsumerTask4| 
------------------------------------------------------------------------------ 
@point one(.Count=249;amount=4)| no time  | no time  | @point one | 
no time      | @point one | processing msg| @point four | 
@point four     | no time  | @point one | ended;  | 
ended;       | @point four | processing msg| ended;  | 
ended;       | ended;  | ...   | ended;  | 

在這裏,當已經入隊的最後一個消息的情況下,我們有有單獨處理249個消息的一個消費者任務左側,但最壞的情況可能是,如果他們都將停止,後最後一條消息,潛在的數百條消息將被卡住。

+0

使用內核同步 - BlockingCollection等回調如果性能不足。 – 2013-03-02 01:30:24

+0

你的問題可以通過使用'ThreadPool'類並調整最小和最大線程數來解決。請參閱[此MSDN資源](http://msdn.microsoft.com/en-en/library/0ka9477y(v = vs.80).aspx) – didierc 2013-03-05 19:02:01

+0

嗯,我不認爲'ThreadPool'中的線程很短否則,線程的最小數量的含義是什麼?由於線程是爲了運行定義明確的任務,所以線程可能只是在無所事事的時候進入睡眠狀態。 – didierc 2013-03-05 20:29:04

回答

1

看來,我終於想出了一個解決方案,但不確定性能。請考慮下面的代碼,任何反饋將不勝感激!我仍然希望看到一些其他的解決方案或想法,即使它們是完全不同的,並且需要對方法進行重大改變。這是客觀的:「一種方式來處理多個線程隊列,動態調整消費者的數量」

class MessageController 
{ 
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>(); 

    private ManualResetEvent mre = new ManualResetEvent(true); 

    private int amountOfConsumers; 

    object o = new object(); 

    public void Enqueue(IMessage message) 
    { 
     messageQueue.Add(message); 

     mre.WaitOne(); 
     if (Math.Floor((double)messageQueue.Count/100)+1 > amountOfConsumers) 
     { 
      Interlocked.Increment(ref amountOfConsumers); 

      var task = Task.Factory.StartNew(() => 
      { 
       IMessage msg; 
       bool repeat = true; 

       while (repeat) 
       { 
        while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50)/100)) + 1 >= amountOfConsumers)) 
        { 
         msg = messageQueue.Take(); 
         //process msg... 
        } 

        lock (o) 
        { 
         mre.Reset(); 

         if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51)/100)) < amountOfConsumers)) 
         { 
          ConsumerQuit(); 
          repeat = false; 
         } 

         mre.Set(); 
        } 
       } 
      }); 
     } 
    } 

    public void ConsumerQuit() 
    { 
     Interlocked.Decrement(ref amountOfConsumers); 
    } 
} 
+0

我不確定我是否理解了您真正想要解決的問題,但只要您有解決方案,那就太好了。感謝分享! – didierc 2013-03-05 20:41:18

+0

非常樂意分享。那麼這個答案主要解決問題的最後部分(我在那裏提到了確切的代碼行)。但我仍然不知道這是否會帶來好的表現。我希望能從別人那裏得到一些見解。 – Jyrkka 2013-03-05 21:01:48

1

我最初的想法是,你是倒退設計這個。

在考慮並行性時,通過向單個任務添加更多線程可能無法總是獲得效率。有時最好的數字等於或小於您正在使用的機器上的內核數量。原因是你正在創造更多的開銷和鎖定爭用和線程切換。

通過增加更多消費者,您可能會發現消費率實際上下降而不是增加。

需要考慮的一件事是處理消息需要多長時間?如果這個時間明顯長於生成任務所需的時間,爲什麼不讓一個消費者創建一個新的任務來處理每條消息?

class MessageController 
{ 
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>(); 

    public void Enqueue(IMessage message) 
    { 
     messageQueue.Add(message); 
    } 

    public void Consume() 
    { 
     //This loop will not exit until messageQueue.CompleteAdding() is called 
     foreach (var item in messageQueue.GetConsumingEnumerable()) 
     { 
      IMessage message = item; 
      Task.Run(() => ProcessMessage(message); 
     } 
    } 
} 
+0

消息將被處理得相對較快,它會以某種方式分析並給出。使用每條消息創建新任務是一種有趣的方法,但正如我所說,消息速率預計會非常密集,並且我認爲擁有數千個任務運行並不是一個好主意,嘿,還沒有你說最好的數字是否等於或少於內核的數量?:) – Jyrkka 2013-03-05 19:19:48

+0

擁有數千個任務可以是高效的。只是你並不總是想讓很多任務做同樣的事情。例如,處理消息執行不同的任務,因爲每個消息都不同,因此處理消息的行爲是不同的。如果有數以千計的試圖消費同一個集合的任務可能不會帶來好處,只需要幾個過程來完成任務。 – 2013-03-05 19:24:15

+0

我會建議嘗試以下內容,並且不要爲每條消息創建一個新任務。您可能會發現,這足以滿足您的需求。 – 2013-03-05 19:30:09