我試圖使用BlockingCollection<T>
實現生產者/消費者模式,所以我寫了一個簡單的控制檯應用程序來測試它。如何正確使用BlockingCollection.GetConsumingEnumerable?
public class Program
{
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
while (true)
{
}
}
}
public class WorkQueue
{
private BlockingCollection<int> _queue;
private static Random _random = new Random();
public WorkQueue()
{
_queue = new BlockingCollection<int>();
// Prefill some items.
for (int i = 0; i < 100; i++)
{
//_queue.Add(_random.Next());
}
}
public void StartProducingItems()
{
Task.Run(() =>
{
_queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else.
});
}
public void StartProcessingItems()
{
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 1: " + item);
}
});
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 2: " + item);
}
});
}
}
但是有3個問題,我的設計:
我不知道堵/守候在我的主要方法的正確途徑。做一個簡單的空while循環看起來非常低效,CPU使用浪費只是爲了確保應用程序不會結束。
我的設計還存在另一個問題,在這個簡單的應用程序中,我有一個生產者無限期地生產物品,並且不應該停下來。在真實世界的設置中,我希望它最終結束(例如,用完文件處理)。在那種情況下,我應該如何等待它在Main方法中完成?製作
StartProducingItems
async
然後await
呢?GetConsumingEnumerable
或Add
未按預期工作。生產者應該不斷添加物品,但是它會添加一個物品,然後再也不會添加物品。這一個項目然後由其中一個消費者處理。兩個消費者然後阻止等待物品被添加,但沒有一個是。我知道Take
方法,但在while循環中再次旋轉Take
看起來非常浪費和低效。有一個CompleteAdding
方法,但它不允許添加任何其他內容,如果嘗試,則會引發異常,因此不適用。
我肯定知道這兩個消費者其實阻塞和等待新的項目,我可以在調試過程中的線程之間進行切換:
編輯:
我我在其中一條評論中提出了修改建議,但Task.WhenAll
仍然立即返回。
public Task StartProcessingItems()
{
var consumers = new List<Task>();
for (int i = 0; i < 2; i++)
{
consumers.Add(Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine($"Worker {i}: " + item);
}
}));
}
return Task.WhenAll(consumers.ToList());
}
'生產者應該不斷添加物品,但它會添加一個物品,然後再也不會添加。這一個項目然後由其中一個消費者處理。這兩個消費者然後阻止等待物品被添加,但沒有一個是'呃,是的。你還期待什麼?消費者被阻止直到物品被添加,這就是爲什麼它被稱爲阻塞收集 –
不,問題是生產者增加一個物品然後停止。 – user9993
這就是你編碼的方式。您正在開始添加單個項目的任務。你錯過了一個循環或什麼? –