0

我試圖使用BlockingCollection<T>實現生產者/消費者模式,所以我寫了一個簡單的控制檯應用程序來測試它。如何正確使用BlockingCollection.GetConsumingEnumerable?

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     var workQueue = new WorkQueue(); 
     workQueue.StartProducingItems(); 
     workQueue.StartProcessingItems(); 

     while (true) 
     { 

     } 
    } 
} 

public class WorkQueue 
{ 
    private BlockingCollection<int> _queue; 
    private static Random _random = new Random(); 

    public WorkQueue() 
    { 
     _queue = new BlockingCollection<int>(); 

     // Prefill some items. 
     for (int i = 0; i < 100; i++) 
     { 
      //_queue.Add(_random.Next()); 
     } 
    } 

    public void StartProducingItems() 
    { 
     Task.Run(() => 
     { 
      _queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else. 
     }); 
    } 

    public void StartProcessingItems() 
    { 
     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 1: " + item); 
      } 
     }); 

     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 2: " + item); 
      } 
     }); 
    } 
} 

但是有3個問題,我的設計:

  • 我不知道堵/守候在我的主要方法的正確途徑。做一個簡單的空while循環看起來非常低效,CPU使用浪費只是爲了確保應用程序不會結束。

  • 我的設計還存在另一個問題,在這個簡單的應用程序中,我有一個生產者無限期地生產物品,並且不應該停下來。在真實世界的設置中,我希望它最終結束(例如,用完文件處理)。在那種情況下,我應該如何等待它在Main方法中完成?製作StartProducingItemsasync然後await呢?

  • GetConsumingEnumerableAdd未按預期工作。生產者應該不斷添加物品,但是它會添加一個物品,然後再也不會添加物品。這一個項目然後由其中一個消費者處理。兩個消費者然後阻止等待物品被添加,但沒有一個是。我知道Take方法,但在while循環中再次旋轉Take看起來非常浪費和低效。有一個CompleteAdding方法,但它不允許添加任何其他內容,如果嘗試,則會引發異常,因此不適用。

我肯定知道這兩個消費者其實阻塞和等待新的項目,我可以在調試過程中的線程之間進行切換:

enter image description here

編輯:

我我在其中一條評論中提出了修改建議,但Task.WhenAll仍然立即返回。

public Task StartProcessingItems() 
{ 
    var consumers = new List<Task>(); 

    for (int i = 0; i < 2; i++) 
    { 
     consumers.Add(Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine($"Worker {i}: " + item); 
      } 
     })); 
    } 

    return Task.WhenAll(consumers.ToList()); 
} 
+0

'生產者應該不斷添加物品,但它會添加一個物品,然後再也不會添加。這一個項目然後由其中一個消費者處理。這兩個消費者然後阻止等待物品被添加,但沒有一個是'呃,是的。你還期待什麼?消費者被阻止直到物品被添加,這就是爲什麼它被稱爲阻塞收集 –

+0

不,問題是生產者增加一個物品然後停止。 – user9993

+0

這就是你編碼的方式。您正在開始添加單個項目的任務。你錯過了一個循環或什麼? –

回答

4

GetConsumingEnumerable()正在阻塞。如果你想添加到隊列不斷,你應該把調用_queue.Add在一個循環:

public void StartProducingItems() 
{ 
    Task.Run(() => 
    { 
     while (true) 
      _queue.Add(_random.Next()); 
    }); 
} 

關於Main()方法,你可以調用Console.ReadLine()方法來防止主線程完成您按下前關鍵:

public static void Main(string[] args) 
{ 
    var workQueue = new WorkQueue(); 
    workQueue.StartProducingItems(); 
    workQueue.StartProcessingItems(); 

    Console.WriteLine("Press a key to terminate the application..."); 
    Console.ReadLine(); 
} 
+0

只要我按下回車鍵,它就會結束。一定有更好的方法。也許製作的StartProducingItems應該是異步的? – user9993

+0

@ user9993公開你在'StartProcessingItems'中創建的任務,然後在Main()中調用'.Wait()'。如果您有多個工作人員,則使用Task.WhenAll創建一個聚合任務,然後將其公開出來 –

+0

如果您從WorkQueue類中公開任務,您可以等待這些任務。但你如何告訴生產者停止生產? – mm8