2014-09-02 114 views
0

我想建立在this問題上。TPL數據流bufferblock消息發送定時器

到目前爲止,我已經得到了並行處理作業的方法。我在一個控制檯應用程序中運行這個。我從數據庫中獲得50個工作,使用TPL DataFlow處理它們,到目前爲止這麼好。但是我意識到,如果有一項工作需要一小時的時間才能完成,其餘工作將在15分鐘內完成,那麼控制檯應用程序將繼續工作一個小時而不處理任何其他工作。我無法將其更改爲Windows服務,因此我必須使控制檯應用程序處理新作業進入,可能每15分鐘檢查一次。

我可以啓動一個計時器,每15分鐘檢查一次新的工作。如果db中有新的工作,我需要添加到buffer block,所以actionblock可以處理它。問題是,在添加前50個作業之後,必須爲緩衝區和操作塊調用complete和completion.wait。所以我不能再添加新的作業到現有的緩衝區。

我可以檢查當前的actionblock's isCompleted屬性,然後動態創建另一個組合buffer/actionblock。基本上,條件是如果當前actionblock仍在繼續,請檢查timer上的新工作並創建新的buffer/actionblock組合。這就是我打算做的。但在我走下這條路之前,還有另一種方法可以解決這個問題嗎?

回答

1

如果我理解正確,並且所有你想要的是同時執行的作業的一個「流」,但不超過50個作業等待執行,你可以使用相同的ActionBlockBoundedCapacity,並添加到它時您可以:

private static Task ProcessJobsAsync(CancellationToken cancellationToken) 
{ 
    var block = new ActionBlock<Job>(
     job => job.Process(), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, // Or any other value that fits 
      BoundedCapacity = 50, 
     }); 
    cancellationToken.Register(block.Complete); 
    var producer = Task.Run(async() => 
    { 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      foreach (var job in await GetJobsAsync()) 
      { 
       await block.SendAsync(job,cancellationToken); 
      } 
     } 
    }); 

    return Task.WhenAll(producer, block.Completion); 
} 

如果塊是緩慢的,並且達到它的將異步等到空間在隊列中的其他作業清除能力wait block.SendAsync(job,cancellationToken);。這樣你總是有工作要執行。當你想關閉應用程序(或取消操作),你可以發信號通知使用CancellationToken

+0

我想我可能沒有100%清楚。控制檯作業每5分鐘運行一次計時器。這個想法是處理新的工作,因爲他們每次運行最多可以有50個工作。有時只需10個工作時間不到5分鐘即可完成,有時可能是30個,有些則需要1個小時。因此,如果一項工作需要一個小時,則控制檯應用程序將運行一個小時(下一個控制檯實例將一直等到當前完成)。但是,如果只有一個工作需要一個小時,並在15分鐘內完成休息,我想在同一個控制檯作業實例中繼續檢查新作業 - 這就是爲什麼計時器。 – 2014-09-02 18:12:56

+0

它不是一個需要執行的工作流。控制檯作業在處理50(或有多少可用)後結束,並在檢查頻率上再次喚醒。 – 2014-09-02 18:14:06

+1

@AlexJ所以你每15分鐘運行一次你的應用程序?爲什麼不只是運行一次,它將確保始終獲得新項目進行處理? – i3arnon 2014-09-02 18:25:05