2017-02-16 165 views
0

我問了一個問題here關於爲什麼使用Thread.Run啓動一個進程並沒有像我期望的那樣執行儘可能多的併發請求。同時處理rabbitmq消息

這個問題背後的原因是我試圖創建一個類,它可以將消息從rabbitmq隊列中拉出並同時處理它們達到最大併發消息數。

爲此,我在EventingBasicConsumer類的Received處理程序中結束了以下操作。

async void Handle(EventArgs e) 
{ 
    await _semaphore.WaitAsync(); 

    var thread = new Thread(() => 
    { 
     Process(e); 
     _semaphore.Release(); 
     _channel.BasicAck(....); 
    }); 
    thread.Start(); 
} 

但是,對上一篇文章的評論並不是要啓動一個線程,除非進行CPU綁定工作。

上述處理程序不知道該作品是CPU綁定,網絡,磁盤還是其他。 (Process是一種抽象方法)。

即使如此,我認爲我必須在這裏啓動一個線程或任務,否則Process方法會阻塞rabbitmq線程,並且在完成之前不會再調用該事件處理程序。所以我只能一次處理一種方法。

在這裏開始一個新的Thread好嗎?最初我曾經使用過Task.Run,但是這並沒有產生儘可能多的工人。查看其他帖子。

僅供參考。通過在信號量上設置InitialCount來限制併發線程的數量。

回答

0

正如已經在鏈接問題中所說的,大量的線程並不能保證性能,就好像它們的數量超過了邏輯內核的數量一樣,你得到了一個thread starvation的情況,沒有真正的工作正在進行。

但是,如果您仍然需要處理併發操作的數量,您可以試試TPL Dataflow庫,設置爲MaxDegreeOfParallelism,如this tutorial

var workerBlock = new ActionBlock<EventArgs>(
    // Process event 
    e => Process(e), 
    // Specify a maximum degree of parallelism. 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = InitialCount 
    }); 
var bufferBlock = new BufferBlock(); 
// link the blocks for automatically propagading the messages 
bufferBlock.LinkTo(workerBlock); 

// asynchronously send the message 
await bufferBlock.SendAsync(...); 
// synchronously send the message 
bufferBlock.Post(...); 

BufferBlock是一個隊列,所以消息的順序將被保留。此外,您還可以連接帶有過濾拉姆達塊添加不同的處理程序(具有不同的並行度):

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs); 
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs); 
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs); 

,但在這種情況下,您應該設置在鏈末端的默認處理程序,所以消息不會消失(你可以使用一個NullTarget塊這樣):

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>); 

而且,該塊可以是一個觀察者,所以他們完全與Reactive Extensions在UI方面的工作。