2017-08-29 40 views
5

我有兩個BlockingCollection<T>對象,collection1collection2。我想要消耗這些集合中的物品,並優先處理collection1中的物品。也就是說,如果兩個藏品都有物品,我想首先從collection1獲取物品。如果他們都沒有物品,我想等待物品可用。如何從任何兩個BlockingCollections中獲取優先於第一個集合的項目?

我有以下代碼:

public static T Take<T>(
    BlockingCollection<T> collection1, 
    BlockingCollection<T> collection2) where T:class 
{ 
    if (collection1.TryTake(out var item1)) 
    { 
     return item1; 
    } 

    T item2; 

    try 
    { 
     BlockingCollection<T>.TakeFromAny(
      new[] { collection1, collection2 }, 
      out item2); 
    } 
    catch (ArgumentException) 
    { 
     return null; 
    } 

    return item2; 
} 

此代碼有望重返nullCompleteAdding叫上兩個集合,它們都是空的。

我用這個代碼的主要問題是,對於TakeFromAny方法的文檔指定TakeFromAny將引發ArgumentException如果CompleteAdding被稱爲的「集合」:

的ArgumentException

館藏的說法是一個0長度的數組或包含一個null元素或CompleteAdding()已被調用集合。

如果在任何集合上調用CompleteAdding,它會拋出嗎?或兩個集合?

如果CompleteAdding被調用並且收集仍然有一些項目,它會拋出?

我需要一個可靠的方法來做到這一點。

在此代碼中,我試圖從collection1開始,因爲TakeFromAny的文檔沒有對如果這兩個集合具有項目的項目採集順序提供任何保證。

這也意味着如果兩個收藏都是空的,然後他們在同一時間收到物品,那麼我可能會首先從collection2獲得物品,這很好。

編輯:

我之所以將項目添加到兩個集合(而不是簡單的單一集合)是第一個集合不具有上限,並且第二收集一樣。

更多細節爲那些有興趣誰,爲什麼我需要這樣的:

我在叫ProceduralDataflow一個開源項目使用此。在這裏看到更多的細節https://github.com/ymassad/ProceduralDataflow

數據流系統中的每個處理節點有兩個集合,一個集合將包含第一次進來的項目(所以我需要放慢生產者,如果需要),另一個集合將包含項目進入第二次(或第三次,......)次(由於數據流中的循環)。

一個集合沒有上限的原因是我不想因數據流中的循環造成死鎖。

回答

3

首先,簡單回答您的具體問題。

如果在任何集合上調用CompleteAdding,它會拋出嗎?或兩個集合?

Both(all) - 但僅限於任何集合中沒有可用元素時。

如果CompleteAdding被調用且收集仍然有一些項目,它會拋出?

不。如果集合中有可用元素,它將從集合中刪除並返回給調用者。

結論

顯然,文檔還不清楚。該部分

CompleteAdding()已呼籲收集

應該已經制定不同的 - 像

或存在的任何沒有可用的元素收藏和CompleteAdding()已撥打全部收藏

理由

嗯,我知道依靠實施不是一個好的做法,但是當文檔是不清楚的,實行的是唯一可靠的官方消息,我能想到的。所以採取reference sourceTakeFromAnyTryTakeFromAny調用私人方法TryTakeFromAnyCore。它開始於以下情況:

ValidateCollectionsArray(collections, false); 

false這裏是一個bool參數調用isAddOperation和使用ValidateCollectionsArray裏面如下:

if (isAddOperation && collections[i].IsAddingCompleted) 
{ 
    throw new ArgumentException(
     SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); 
} 

這是可能的地方扔ArgumentException用於收藏與CompleteAdding()之一被稱爲。正如我們所看到的,情況並非如此(問題1)。

然後執行繼續下面的「快速通道」:

//try the fast path first 
for (int i = 0; i < collections.Length; i++) 
{ 
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count 
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item)) 
     return i; 
} 

這證明了問題的答案#2。

最後,如果沒有可用的元素的所有藏品,實施以「慢速路」通過調用另一個私有方法TryTakeFromAnyCoreSlow,用下面的評論是在實施行爲的基本解釋:

//Loop until one of these conditions is met: 
// 1- The operation is succeeded 
// 2- The timeout expired for try* versions 
// 3- The external token is cancelled, throw 
// 4- The operation is TryTake and all collections are marked as completed, return false 
// 5- The operation is Take and all collection are marked as completed, throw 

對於我們的問題的答案是#1和情況#5(注意單詞全部)。順便說一句,它也顯示了TakeFromAnyTryTakeFromAny - 情況#4和#5之間的唯一區別,即throwreturn -1

+0

謝謝。這樣做是有道理的,但文件不清楚。在'TryTakeFromAnyCoreSlow'方法中,第5個條件表示「被標記爲完成」。我假設這意味着'CompleteAdding'被調用並且集合是非空的。我將代碼追蹤到'GetHandles'方法,我認爲情況就是這樣。 –

+0

我希望MSDN上有一些文檔來澄清所有這些。 –

+0

確實。基本上['IsCompleted'](https://msdn.microsoft.com/en-us/library/dd267315(v = vs.110).aspx)屬性 - *此集合是否已標記爲完成添加,並且空。* –

相關問題