2

我做了一個PCollection,作爲BigQuery處理後的管道的結果,現在我想使用與管道分開的那部分數據。如何將PCollection轉移到列表,以便我可以遍歷它並使用內容。如何將PCollection轉移到正常的列表

我從概念上做錯了什麼?

回答

1

一旦你完成了數據處理做了你的數據流的管道內,你很可能希望將數據寫入持久性存儲,如在雲存儲(GCS)文件,BigQuery中的表格,等

然後,您可以使用數據流之外的數據,例如將其讀取到列表中。顯然,它需要適應這一具體行動的記憶。

1

我會做的是創建「側輸出」(https://cloud.google.com/dataflow/model/par-do),這是您與主流程一起創建的另一個PCollection,因此最終您將擁有2個PCollections,作爲您的BQ過程的結果。

只要確保在您的過程函數中創建了添加元素到側面輸出集合的條件。事情是這樣的:

public final void processElement(final ProcessContext context) throws Exception { 
    context.output(bqProcessResult); 
    if (condition) { 
    context.sideOutput(myFilterTag, bqProcessResult); 
    } 
} 

這個過程的結果是不是PCollection但PCollectionTuple,所以你只需要做到以下幾點:

PCollectionTuple myTuples = previous process using the function above...; 
PCollection<MyType> bqCollection = myTuples.get(bqTag); 
PCollection<MyType> filteredCollection = myTuples.get(myFilterTag);