我試圖找到一種方法來處理一個隊列中的幾個線程,動態調整消費者的數量。基本上這個任務非常有名:多個生產者創建消息並將它們提交到隊列中,多個消費者處理來自隊列的消息。現在,我想到了使用像System.Collections.Queue.Synchronized,System.Collections.Concurrent.ConcurrentQueue和System.Collections.Concurrent.BlockingCollection這樣的不同組件的不同方法,但我無法決定如何正確地使用它最大效率,所以我會很高興通過您的意見收到一些明智的想法。
這裏有更多的細節:一個隊列的並行處理
- 消息率預計將在某些場合真的密集,但處理將是比較直接的;
- 我不知道應該有多少消費者;
- 我希望進程調整當前消費者的數量,而不是讓他們被阻止,具體取決於消息排隊的數量(意思是我想爲每百個消息填充額外的消費者fe,並且消費者應該如果入隊消息的數量比填充消息所需的數量少50,則停止第三個消費者將在消息數量增長超過300時填充,並且當它下降到250時它應該停止)。
這是主意。現在,我想將ConcurrentQueue封裝到一個封裝了Enqueue方法的類中,並將檢查排隊後的消息數量,並將決定是否啓動額外的消費者。消費者應該在循環內檢查一下應該做出停止的決定。我認爲你會提出一些更有趣的解決方案。
順便說一句,我仍然不知道如何處理的一種情況理論上是當最後一條消息被排隊並且最後一位消費者已經停止的時候。另一種情況也是停滯 - 如果同一時間停止檢查,一些消費者會被停止。我應該如何處理這些情況?
爲了證明這是什麼意思,看看這個例子:
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count/100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50)/100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
所以現在當我可以指向特定的代碼行這些問題:
- 當最後消費者發現,沒有消息入隊(@point one),並且在它調用ConsumerQuit方法之前,最後一條消息到達並排入隊列,然後檢查其他消費者是否已完成,結果(@point三)仍然是消費者工作,並且因爲單一消息的一個消費者不止是enou gh - 什麼都沒有發生,那麼ConsumerQuit終於被調用了,並且我有一條消息停留在隊列中。
ConsumerTask | LastMessageThread ------------------------------------------------------ @point one(messageQueue.Count=0) | @point two no time | @point three(amountOfConsumers=1) @point four | ended; ended; | ended;
- 了幾位消費者到了「一點一點」同時檢查時,其中一人應當停止(FE messageQueue.Count爲249),其中幾人將之前因爲停止ConsumerQuit將被召喚其中一個人,其他人也會做這個檢查。
ConsumerTask1 | ConsumerTask2| ConsumerTask3 | ConsumerTask4| ------------------------------------------------------------------------------ @point one(.Count=249;amount=4)| no time | no time | @point one | no time | @point one | processing msg| @point four | @point four | no time | @point one | ended; | ended; | @point four | processing msg| ended; | ended; | ended; | ... | ended; |
在這裏,當已經入隊的最後一個消息的情況下,我們有有單獨處理249個消息的一個消費者任務左側,但最壞的情況可能是,如果他們都將停止,後最後一條消息,潛在的數百條消息將被卡住。
使用內核同步 - BlockingCollection等回調如果性能不足。 – 2013-03-02 01:30:24
你的問題可以通過使用'ThreadPool'類並調整最小和最大線程數來解決。請參閱[此MSDN資源](http://msdn.microsoft.com/en-en/library/0ka9477y(v = vs.80).aspx) – didierc 2013-03-05 19:02:01
嗯,我不認爲'ThreadPool'中的線程很短否則,線程的最小數量的含義是什麼?由於線程是爲了運行定義明確的任務,所以線程可能只是在無所事事的時候進入睡眠狀態。 – didierc 2013-03-05 20:29:04