3

我有以下場景。一次阻止收集進程n項 - 儘快完成1

  1. 我從數據庫中將50個作業轉換爲阻塞集合。

  2. 每個工作都是長期運行的。 (可能可能)。所以我想在一個單獨的線程中運行它們。 (我知道 - 將它們作爲Task.WhenAll運行並讓TPL找出它可能會更好 - 但我想控制同時運行多少次)

  3. 說我想同時運行其中5個(可配置)

  4. 我創建了5個任務(TPL),每個任務一個並行運行。

我想要做的是儘快拿起阻塞集合中的下一個工作的步驟4的工作之一是完成和繼續下去,直到所有50個完成。

我正在考慮創建一個Static blockingCollection和一個TaskCompletionSource,它將在作業完成時調用,然後可以再次調用使用者一次從隊列中選擇一個作業。我還想在每個工作上調用異步/等待 - 但這是最​​重要的 - 不確定這是否會對方法產生影響。

這是正確的方式來完成我想要做的?

this鏈接相似,但是捕獲是我想要在前N個項目之一完成後立即處理下一個作業。畢竟N沒有完成。

更新:

好吧,我有這個代碼片斷做我想要什麼,如果有人想以後使用它。正如您在下面看到的,創建了5個線程,每個線程在完成當前任務時開始下一個任務。在任何給定時間只有5個線程處於活動狀態。我知道這總是可能無法100%地工作,並且如果與一個cpu /內核一起使用,將會出現上下文切換的性能問題。

var block = new ActionBlock<Job>(
       job => Handler.HandleJob(job), 
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

       foreach (Job j in GetJobs()) 
        block.SendAsync(j); 

工作2開始螺紋:13。等待時間:3600000ms。時間:2014年8月29日 3:14:43 PM

作業4在線程上啓動:14。等待時間:15000ms。時間:2014年8月29日 3:14:43 PM

作業0在線程上啓動:7。等待時間:600000ms。時間:2014年8月29日 3:14:43 PM

作業1在線程上啓動:12。等待時間:900000ms。時間:2014年8月29日 3:14:43 PM

作業3從線程開始:11。等待時間:120000ms。時間:2014年8月29日 3:14:43 PM

作業4完成線程:14。 8/29/2014 3:14:58 PM

作業5從線程開始:14。等待時間:1800000ms。時間:2014年8月29日 3:14:58 PM

作業3在線程完成:11。 8/29/2014 3:16:43 PM

作業6從線程開始:11。等待時間:1200000ms。時間:2014年8月29日 3:16:43 PM

作業0在線程上完成:7。 8/29/2014 3:24:43 PM

作業7開始於線程:7。等待時間:30000ms。時間:8/29/2014 3:24:43 PM

作業7在線程完成:7。 2014年8月29日下午3時25分13秒

工作8日開始上線:7。等待時間:100000ms。時間:2014年8月29日 下午3時25分13秒

工作8完成上線:7。 8/29/2014 3:26:53 PM

作業9在線程上啓動:7。等待時間:900000ms。時間:2014年8月29日 3:26:53 PM

作業1在線程上完成:12。 2014年8月29日下午3時29分43秒

工作10開始上線:12。等待時間:300000ms。時間:8/29/2014 3:29:43 PM

作業10在線程上完成:12。 8/29/2014 3:34:43 PM

作業11從線程開始:12。等待時間:600000ms。時間:2014年8月29日 3:34:43 PM

作業6在線程完成:11。 8/29/2014 3:36:43 PM

作業12從線程開始:11。等待時間:300000ms。時間:2014年8月29日 下午三時36分43秒

工作12完成上線:11。 8/29/2014 3:41:43 PM

作業13從線程開始:11。等待時間:100000ms。時間:2014年8月29日 下午3時41分43秒

工作9完成了螺紋:7。 8/29/2014 3:41:53 PM

作業14在線程上啓動:7。等待時間:300000ms。時間:2014年8月29日 下午3點41分53秒

工作13完成上線:11。 8/29/2014 3:43:23 PM

作業11在線程完成:12。 8/29/2014 3:44:43 PM

作業5在線程完成:14。 8/29/2014 3:44:58 PM

作業14在線程上完成:7。 8/29/2014 3:46:53 PM

作業2在線程完成:13。 2014年8月29日下午四點十四分43秒

+0

關於你提到的更新:我的建議不應該有問題在單核機器上,因爲TPL可以優化並選擇比max(5)更低的並行度以減少上下文切換。 – i3arnon 2014-08-29 21:07:55

+0

另一個說明:我使用'block.Post(item)'是有原因的。當你沒有在ActionBlock上設置BoundedCapcity時,使用'await block.SendAsync(item)'是多餘的,它會(非常輕微)傷害性能。 – i3arnon 2014-08-29 21:10:14

+0

是的,但如果你在我的代碼示例中注意到,我不再使用異步作業=>等待job.ProcessAsync()和我想,然後使用block.SendAsync可能有幫助嗎? – 2014-08-29 21:26:04

回答

3

您可以使用TPL Dataflow輕鬆實現您所需的功能。

你可以做的是使用BufferBlock<T>,這是一個緩衝區來存儲你的數據,並將它與ActionBlock<T>連接在一起,當它們從BufferBlock<T>進入時,它們將消耗這些請求。

現在,這裏的美在於您可以指定要使用ExecutionDataflowBlockOptions類同時處理多少請求ActionBlock<T>

下面是一個簡單的控制檯版本,他們會來在處理一堆數字,打印他們的名字和Thread.ManagedThreadID

private static void Main(string[] args) 
{ 
    var bufferBlock = new BufferBlock<int>(); 

    var actionBlock = 
     new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}", 
            i, Thread.CurrentThread.ManagedThreadId), 
          new ExecutionDataflowBlockOptions 
           {MaxDegreeOfParallelism = 5}); 

    bufferBlock.LinkTo(actionBlock); 
    Produce(bufferBlock); 

    Console.ReadKey(); 
} 

private static void Produce(BufferBlock<int> bufferBlock) 
{ 
    foreach (var num in Enumerable.Range(0, 500)) 
    { 
     bufferBlock.Post(num); 
    } 
} 

您也可以將它們異步,如果需要,可以使用awaitable BufferBlock.SendAsync

通過這種方式,您可以讓TPL爲您處理所有節流而無需手動執行。

+1

'BufferBlock'在這裏是多餘的。 – i3arnon 2014-08-29 10:37:02

+0

@ l3arnon您還將如何發佈數據? – 2014-08-29 10:42:31

+0

http://stackoverflow.com/a/25566931/885318 – i3arnon 2014-08-29 10:59:21

2

您可以使用BlockingCollection,它可以正常工作,但它是在async-await之前構建的,所以它會同步阻塞,在大多數情況下可能會縮小可擴展性。

您最好使用async準備TPL Dataflow作爲Yuval Itzchakov建議。所有你需要的是一個ActionBlock,隨着5 MaxDegreeOfParallelism同時處理每個項目,您發佈工作,它同步(block.Post(item))或異步(await block.SendAsync(item)):

private static void Main() 
{ 
    var block = new ActionBlock<Job>(
     async job => await job.ProcessAsync(), 
     new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5}); 

    for (var i = 0; i < 50; i++) 
    { 
     block.Post(new Job()); 
    } 

    Console.ReadKey(); 
} 
+0

好的,這是一個簡潔的示例,我認爲這對我很有用,因爲我不需要多個接收器。題。因此,如果每個作業都從數據庫中獲取一些數據,而這些數據可能需要5到15分鐘,並且我做了job.ProcessAsync,這是否意味着動作塊將開始處理前5個數據,等待從數據庫獲取數據他們,然後排隊等待下一個5,或等到5中的任何一個實際完成,然後啓動下一個1並一次繼續執行1。 – 2014-08-29 14:30:18

+0

@AlexJ後者。 – 2014-08-29 14:40:21

+0

好的,謝謝。另外,如果我將LongRunning選項引入ExecuteDataFlowBlockOptions,除了在單獨的線程上啓動這些選項之外,它不會有任何區別,對嗎?有些可能會很長時間(如一小時),我聽說TPL有時可能會等待其他工作,如果我不這樣做。 – 2014-08-29 14:43:05

相關問題