2016-11-19 33 views
2

.NET中有一個BlockingCollection<T>.TakeFromAny方法。它首先嚐試快速獲取Take,然後默認爲等待基礎句柄的「緩慢」方法。我想用它來傾聽提供「消息」的上游生產商和提供「結果」的下游生產商。BlockingCollection <T> .TakeFromAny,對於具有不同通用類型的集合

  • Can TakeFromAny可以使用 - 或者是有另外一種方式,不需要重新實現 - 可以監聽添加異構類型Blocking Collections的集合嗎?

下面的代碼是類型有效,自然無法編譯:

object anyValue; 
var collection = new List<BlockingCollection<object>>(); 
// following fails: cannot convert 
// from 'System.Collections.Concurrent.BlockingCollection<Message>' 
// to 'System.Collections.Concurrent.BlockingCollection<object>' 
collection.Add(new BlockingCollection<Message>()); 
// fails for same reason 
collection.Add(new BlockingCollection<Result>()); 
BlockingCollection<object>.TakeFromAny(collection.ToArray(), out anyValue); 

這將有可能對採取只處理new BlockingCollection<object>()情況和投,以避免編譯類型錯誤,雖然這樣的錯誤(呃) - 尤其是因爲打字通過方法界面丟失了。使用包裝組合類型將解決後者; fsvo'解決'。


這裏沒有什麼東西直接與問題直接相關,雖然它提供了上下文 - 對於那些有興趣的人。提供核心基礎架構功能的代碼無法使用更高級別的構造(例如Rx或TPL Dataflow)。

這裏是一個基本的流程模型。生產者,代理和工作者在不同的線程上運行(這些工作者可以在同一個線程上運行,具體取決於任務調度器的功能)。

[producer] message --> [proxy] message --> [worker 1] 
      <-- results    <-- results 
            message --> [worker N..] 
            <-- results 

期望是代理偵聽消息(傳入)和結果(返回)。代理完成一些工作,如轉換和分組,並將結果用作反饋。

把代理作爲一個單獨的線程將它從最初的生產源中分離出來,從而完成各種猴子業務。工作任務是爲了並行性,而不是異步性,線程化(在爭用被減少/消除後,儘管代理中的分組)應該允許良好的擴展。

隊列在代理和工作人員之間建立(而不是具有單個輸入/結果的直接任務),因爲在工作人員執行時,可能會有額外的傳入工作消息,它可以在結束之前處理。這是爲了確保工作人員能夠延長/重用其在相關工作流上建立的上下文。

+0

1.是否可以阻止每個集合的線程? 2。你需要保證「只從一個集合中獲取」操作,還是更像「我想處理這兩個集合中的所有項目,但我不想並行執行」? – svick

+0

@svick上游生產者(寫在'消息'隊列中)和扇出下遊消費者/生產者(他們在'結果'隊列中產生結果)可以被阻止 - 這個「代理」消費者/生產者在單獨運行線程上下文,並在轉換後將消息從上游移動到下游消費者,並將結果作爲反饋移回上游。目標是「等待接下來的事情」,從任何隊列開始,然後等待更多。 – user2864740

+0

@svick我想我最初的想法/設計是針對「使用類型化阻止集合進行民意調查/選擇」。 – user2864740

回答

1

我認爲這裏最好的選擇是將兩個阻塞集合的類型更改爲BlockingCollection<object>,您已經提到過,包括其缺點。

如果您不能或不想做,另一個解決辦法是有一個合併BlockingCollection<object>併爲每個源集合項目移動從收集到的合併一個線程:

var producerCollection = new BlockingCollection<Message>(); 
var consumerCollection = new BlockingCollection<Results>(); 

var combinedCollection = new BlockingCollection<object>(); 

var producerCombiner = Task.Run(() => 
{ 
    foreach (var item in producerCollection.GetConsumingEnumerable()) 
    { 
     combinedCollection.Add(item); 
    } 
}); 

var consumerCombiner = Task.Run(() => 
{ 
    foreach (var item in consumerCollection.GetConsumingEnumerable()) 
    { 
     combinedCollection.Add(item); 
    } 
}); 

Task.WhenAll(producerCombiner, consumerCombiner) 
    .ContinueWith(_ => combinedCollection.CompleteAdding()); 

foreach (var item in combinedCollection.GetConsumingEnumerable()) 
{ 
    // process item here 
} 

這不是非常有效,因爲它阻止了兩個額外的線程來完成這個任務,但它是我可以想到的最好的選擇,而無需使用反射來訪問TakeFromAny使用的手柄。

+0

我決定與'BlockingCollection '路線一起走更多的守衛訪問。在這個過程中,我發現TakeFromAny是相當有偏見的。它總是傾向於第一個收集(如果他們有快速的收集,或者如果多個等待處理同時發出信號,則慢速收回)。 – user2864740

相關問題