2017-04-06 43 views
2

我們正在嘗試使用TPL Dataflow框架的數據處理管道。阻止數據流的塊設計

管道的基本要點是:

  1. 迭代通過CSV文件在文件系統(10,000)
  2. 驗證我們尚未導入的內容,如果我們通過一個單一的內容忽略
  3. 迭代CSV文件(20,000-120,000行)並創建符合我們需要的數據結構。
  4. 批量處理這些新的dataStructured項目並將其推送到數據庫中
  5. 將CSV文件標記爲導入。

現在我們有一個現有的Python文件,它以非常緩慢的方式執行上述所有操作 - 代碼很混亂。我的想法如下TPL Dataflow

  1. BufferBlock<string>將所有文件發佈到
  2. TransformBlock<string, SensorDataDto>謂詞來檢測是否導入該文件
  3. TransformBlock<string, SensorDataDto>讀通過CSV文件,並創建SensorDataDto結構
  4. BatchBlock<SensorDataDto>用於TransformBlock委託來分批處理100個請求。

    4.5。 ActionBlock<SensorDataDto>將100條記錄推送到數據庫中。

  5. ActionBlock將CSV標記爲導入。

我創建了最初的幾個操作和他們正在(BufferBlock - >TransformBlock + Predicate & &過程中,如果還沒有),但我不知道該如何繼續流動,這樣我可以張貼100至內的BatchBlock並連線以下操作。

這看起來是否正確 - 基本要點,以及如何解決TPL數據流暢方式中的BufferBlock位?

bufferBlock.LinkTo(readCsvFile, ShouldImportFile) 
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>()) 
readCsvFile.LinkTo(normaliseData) 
normaliseData.LinkTo(updateCsvImport) 
updateCsvImport.LinkTo(completionBlock) 

batchBlock.LinkTo(insertSensorDataBlock) 

bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete()); 
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete()); 
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete()); 
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete()); 

batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete()); 

裏面normaliseData方法我打電話BatchBlock.Post<..>(...),是一個很好的模式,還是應該有不同的結構?我的問題是,我只能在所有記錄被推入後纔將文件標記爲正在導入。

Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait(); 

如果我們有一批100,如果80被推進,有沒有辦法排最後80

我不確定我是否應該在主管道中連接BatchBlock,我會等到兩個都完成。

回答

1

首先,你並不需要在這個問題上使用Completion,你可以鏈接過程中使用PropagateCompletion屬性:

// with predicate 
bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile); 
// without predicate 
readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true }); 

現在,回到你的問題批次。也許,您可以在這裏使用JoinBlock<T1, T2>BatchedJoinBlock<T1, T2>,將它們附加到您的管道中並收集連接的結果,以便您瞭解正在完成的工作的全貌。也許你可以實現你自己的ITargetBlock<TInput>,這樣你就可以用自己的方式消費這些消息。

official docs,塊是貪婪的,並收集來自連接的一種,只要它變爲可用的數據,所以加入塊可能卡住,如果一個目標是準備和其他不,或批處理塊具有批量大小的80%,所以你需要把它放在你的腦海裏。如果您自己實施,您可以使用ITargetBlock<TInput>.OfferMessage方法從您的來源獲取信息。

BatchBlock<T>能夠在貪婪和非貪婪模式下執行。在默認貪婪模式下,從任意數量的源提供給塊的所有消息都被接受並緩衝以轉換成批。

在非貪婪模式下,所有消息都會從源中推遲,直到有足夠的源向塊提供消息來創建批處理。因此,可以使用BatchBlock<T>來接收來自N源中的每一個的1元素,來自1源的N元素以及它們之間的無數選項。