2012-04-05 81 views
2

我已閱讀TPL文檔和許多教程,但沒有涵蓋我想實現的模型。.NET任務並行庫

某些算法總是有固定數量的迭代。

我需要不斷運行的線程(儘可能多的):

,而(真)

  • 從主線程獲取數據
  • 執行繁重耗時的任務(在單獨的線程)
  • 更新主線程信息

另外我需要的機制,將abl e設置鬧鐘(例如鬧鐘)。 5秒)。五秒鐘後,所有工作必須暫停一段時間,然後重新開始。

我應該使用Task.ContinueWith相同的任務嗎?但是我沒有處理之前任務啓動的結果,而是我更新了主線程中的數據結構,然後決定將要輸入的新任務迭代將會如何...

我該如何離開TPL決定應該有多少任務創造最佳效率?

不,我使用BackgroundWorkers,因爲他們有很好的RunEventCompleted事件 - 它裏面我在我的主線程,所以我可以更新我的MAIN結構,檢查時間約束,然後再次在完成的BackgroundWorker上調用StartAsync。這很好,很清楚,但可能非常不方便。 我需要使它在多處理器,多核服務器上高效運行。

一個問題是計算總是在線,從不停止。還有一些網絡,它可以遠程詢問MAIN結構的當前狀態。

第二個問題是關鍵時間控制(我必須有精確的計時器 - 當它停止時,沒有線程可以重新啓動)。然後在結束之後會有特殊的高優先級任務,所有工作都會恢復。

第三個問題是操作沒有上限。

這三個約束,從我所觀察到的,都不會沿着TPL走 - 我不能使用類似Parallel.For的東西,因爲集合是由任務本身的結果實時修改的...... 我沒有也知道如何結合:

  • 讓TPL決定多少線程應創建
  • 用sort線程的一生乳寧(有停頓和連續重新啓動之間的同步點)
  • 創建線程只有一次能力在開始時(他們應該只用不斷新的pa重新啓動rameters)

有人可以給我提示嗎? 我知道如何做到這一點,不好,不合時宜的方式。我描述了一些小的要求,這些要求阻止我這樣做。我有點困惑。

+0

看起來您似乎回答了您的原始問題,我不確定新的問題。 – Jodrell 2012-04-05 10:42:47

回答

3

你需要使用消息+演員+一個調度器IMO。然後你需要使用一種能夠勝任的語言。看看從Azure服務總線異步接收的this code,排隊在共享隊列中並通過actor來管理運行時狀態。

內聯:

我應該使用Task.ContinueWith相同的任務?

不,繼續在每個延續傳遞內部基於異常處理來讓你的程序被終止; TPL沒有好的方法將失敗的狀態編入調用端/主線程。

,但我不能處理前一個任務推出的結果,而是改爲 我 主線程更新數據結構,然後決定如何將新的任務 迭代的輸入...

除非你願意花很多時間來解決這個問題,否則你需要擺脫線程。

我該如何離開TPL決定應該爲 最佳效率創建多少任務?

這是由運行異步工作流程的框架處理的。

沒有我使用BackgroundWorkers,becase的,他們有很好的 RunEventCompleted事件 - 裏面我對我的主線程,所以我可以 更新我的主要結構,檢查時間限制並最終 呼叫StartAsync再次BackgroundWorker的完成。這是 很好,清楚,但可能非常無益。我需要在多處理器,多核服務器上使其效率更高。

一個問題是計算總是在線,從不停止。有 也是一些網絡,它可以遠程詢問目前的主要結構狀態 。第二個問題是關鍵的時間控制(我 必須有精確的計時器 - 當它停止,其中沒有線程可以 重新啓動)。

如果您以異步方式運行所有內容,則可以將消息傳遞給將其掛起的消息。你調度演員負責調用其所有訂閱者的調度消息;看看鏈接的代碼中的paused狀態。如果您有未解決的請求,您可以向他們傳遞取消令牌,並以這種方式處理「硬」取消/套接字中止。

然後在它結束後來特別高優先級的任務,所有 工作恢復。這兩個制約因素,從我觀察到的東西,不 沿着TPL順利 - 我不能使用類似的Parallel.For因爲 集合是通過任務本身實時的結果修改...

你可能需要一個叫做管道和過濾器的模式。你將你的意見輸入一連串的工人(演員);每個工人從另一個工人的產出中消耗。信令是通過一個控制信道完成的(在我的情況下,這是演員的收件箱)。

0

我想你應該閱讀

MSDN: How to implement a producer/consumer dataflow pattern

我有同樣的問題:一個生產者生產的項目,而一些消費者消耗他們,並決定將其發送給其他消費者。每個消費者都與其他消費者異步獨立工作。

您的主要任務是生產者。他生產你的其他任務應該處理的項目。與你的主要任務的代碼的類有一個函數:

public async Task ProduceOutputAsync(...) 

主程序使用啓動此任務:

var producerTask = Task.Run(() => MyProducer.ProduceOutputAsync(...) 

一旦這就是所謂的生產任務開始產生輸出。同時你的主程序可以繼續做其他事情,例如啓動消費者。

但我們先來關注Producer任務。

生產者任務生成要由其他任務處理的類型爲T的項目。使用實現ITargetBlock'的對象將它們轉移到其他任務。

每次生產者任務已完成創建類型T的對象將其發送到使用ITargetBlock.Post目標塊,或優選異步版本:

while (continueProducing()) 
{ 
    T product = await CreateProduct(...) 
    bool accepted = await this.TargetBlock(product) 
    // process the return value 
} 
// if here, nothing to produce anymore. Notify the consumers: 
this.TargetBlock.Complete(); 

生產者需要一個ITargetBlock <T>。在我的應用程序中,BufferBlock <T>就足夠了。檢查其他可能的目標的MSDN。

無論如何,數據流塊還應該實現ISourceBlock <T>。您的接收器等待輸入到達源,獲取並處理它。一旦完成,它可以將結果發送到其自己的目標塊,並等待下一個輸入,直到沒有輸入預期了。當然,如果你的消費者不生產產品,它不需要向目標發送任何東西。

等待輸入如下進行:

ISourceBlock`<T`> mySource = ...; 
while (await mySource.ReceiveAsync()) 
{ // a object of type T is available at the source 
    T objectToProcess = await mySource.ReceiveAsync(); 
    // keep in mind that someone else might have fetched your object 
    // so only process it if you've got it. 
    if (objectToProcess != null) 
    { 
     await ProcessAsync(objectToProcess); 

     // if your processing produces output send the output to your target: 
     var myOutput = await ProduceOutput(objectToprocess); 
     await myTarget.SendAsync(myOutput); 
    } 
} 
// if here, no input expected anymore, notify my consumers: 
myTarget.Complete(); 
  • 構建你的製片人
  • 構建所有消費者
  • 給生產BufferBlock其輸出發送到
  • 開始生產者MyProducer .ProduceOutputAsync(...)
  • 生產者生成輸出並將其發送到緩衝區塊:
  • 給消費者的一致BufferBlock
  • 啓動消費者作爲單獨的任務
  • 等待Task.WhenAll(...)等待所有任務的完成。

只要聽到沒有預期輸入,每個消費者都會停下來。 所有任務完成後,您的主要功能可以讀取結果並返回