我正在使用Google Cloud Dataflow並具有ParDo功能,需要訪問PCollection中的所有元素。爲了達到這個目的,我想將一個PCollection < T>轉換成一個PCollection < Iterable < T >>包含所有元素的單個Iterable。我想知道是否有一個更清潔/更簡單/更快的解決方案,我想出了。簡單的方法將PCollection <T>合併到PCollection中<Iterable<T>>
第一種方法是創建一個虛擬鍵,執行GroupByKey,然後獲取值。
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
第二種方法遵循這裏的建議:How do I make View's asList() sortable in Google Dataflow SDK?但沒有排序。我創建了一個View.asList(),創建了一個虛擬PCollection,然後在視圖的虛擬PCollection上應用一個ParDo函數作爲側面輸入,並簡單地返回該視圖。
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
似乎有一個預定義的組合函數用於此任務,但我找不到一個。謝謝您的幫助!