2016-08-13 174 views
5

我在我的應用程序中實現了使用TPL Dataflow的生產者/消費者模式。我有大約40個塊的大數據流網格。網格中有兩個主要的功能部分:生產者部分和消費者部分。有時,消費者在處理來訪工作時應該連續爲生產者提供大量的工作。當消費者忙於某些指定數量的工作項目時,我想暫停生產者。否則,應用會消耗大量的內存/ CPU,並且行爲不可持續。當消費者不知所措時,如何讓快速製片人暫停?

我做了演示應用程序演示了這個問題:

mesh

using System; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace DataflowTest 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var options = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false 
      }; 

      var boundedOptions = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false, 
       BoundedCapacity = 5 
      }; 

      var bufferBlock = new BufferBlock<int>(boundedOptions); 
      var producerBlock = new TransformBlock<int, int>(x => x + 1, options); 
      var broadcastBlock = new BroadcastBlock<int>(x => x, options); 

      var consumerBlock = new ActionBlock<int>(async x => 
      { 
       var delay = 1000; 
       if (x > 10) delay = 5000; 

       await Task.Delay(delay); 

       Console.WriteLine(x); 
      }, boundedOptions); 

      producerBlock.LinkTo(bufferBlock); 
      bufferBlock.LinkTo(broadcastBlock); 
      broadcastBlock.LinkTo(producerBlock); 
      broadcastBlock.LinkTo(consumerBlock); 

      bufferBlock.Post(1); 

      consumerBlock.Completion.Wait();    
     }   
    } 
} 

該應用程序打印是這樣的:

2 
1 
3 
4 
5 
69055 
69053 
69054 
69057 
438028 
438040 
142303 
438079 

這意味着生產商都在旋轉,推動信息消費。我希望它暫停並等待消費者完成當前部分工作,然後生產者應繼續爲消費者提供消息。

我的問題是類似於其他question的報價,但沒有正確回答。我嘗試過這個解決方案,它在這裏不起作用,允許生產者用消息來淹沒消費者。同樣設置BoundedCapacity也不起作用。

我猜到目前爲止唯一的解決方案是製作我自己的塊,它將監視目標塊隊列並根據目標塊的隊列行爲。但我希望對這個問題有點矯枉過正。

+0

你有沒有考慮過使用'Rx'?看看這個答案:http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –

+0

我希望不會有這樣的需要,因爲大量的時間花在數據流上,它適合我的需求很好。 – kseen

+0

在您的演示中,製作人可以自己製作所有的信息,而無需通過廣播塊接收來自其自身的信息。你的真實代碼是否也是這樣,還是生產者→生產者循環是必要的? – svick

回答

4

如果您需要保持生產者→緩衝區→廣播週期不變,那麼您需要將廣播塊替換爲仍然廣播其接收到的消息的其他塊,但在其中一個目標已滿時等待。

只要你知道塊的目標,當你創造它,你可以使用ActionBlock(代碼another answer of mine複製)構建它:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets) 
{ 
    var block = new ActionBlock<T>(
     async item => 
     { 
      foreach (var target in targets) 
      { 
       await target.SendAsync(item); 
      } 
     }, new ExecutionDataflowBlockOptions 
     { 
      BoundedCapacity = options.BoundedCapacity, 
      CancellationToken = options.CancellationToken 
     }); 

    block.Completion.ContinueWith(task => 
    { 
     foreach (var target in targets) 
     { 
      if (task.Exception != null) 
       target.Fault(task.Exception); 
      else 
       target.Complete(); 
     } 
    }); 

    return block; 
} 

利用這一點,你可以聲明廣播塊:

var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock); 

(你也將需要刪除LinkTo線從broadcastBlock鏈接。)

您的原始代碼中有一個問題沒有完成,但這是TPL數據流中一個難以解決的問題,通常使用週期。

+0

關於完成,如果我的網絡將連續?就像將來沒有任何完成,它應該在應用程序工作時繼續工作。 – kseen

+0

我剛剛在我的演示應用程序中試過這種'GuaranteedBroadcastBlock',它的功能就像一個魅力!完善!非常感謝。 – kseen

+0

這是最好的情況:你不需要完成,所以很好,它不起作用。 – svick

0

它看起來像你的製作人生成一個序列,所以不需要整個生產者→緩衝→廣播週期。取而代之的是,所有這三個塊可以通過一個async循環,產生的下一個項目來替換,然後將其發送到使用await SendAsync()消費者:

Task.Run(async() => 
{ 
    int i = 1; 
    while (true) 
    { 
     await consumerBlock.SendAsync(i); 
     i++; 
    } 
    consumerBlock.Complete(); 
}); 

這樣,一旦消費者達到其容量,await SendAsync()將確保生產者等待消費者消費一件物品。

如果您想將此生產者封裝到數據流塊中,以便您可以將其鏈接到消費者,you can

+0

我真正的「製作人」是一組加載評論頁面(包含到下一個評論頁面的鏈接)的塊,解析當前評論頁面的內容,將評論發送給消費者並再次開始這個循環,通過地址下一個評論頁面到這個生產者週期的第一個塊。所以,不幸的是,這不僅僅是一個序列。它就像鏈接序列,其中序列中的每個元素都具有到其中下一個元素的地址,並且序列中的最後一個元素沒有下一個元素的地址。對不起,這個問題很簡單。 – kseen

+0

我剛剛製作了代表真實情況的圖表。這裏你去:http://imgur.com/iEklfeG – kseen

相關問題