2017-03-08 66 views
2

我正在使用Google Cloud Dataflow並具有ParDo功能,需要訪問PCollection中的所有元素。爲了達到這個目的,我想將一個PCollection < T>轉換成一個PCollection < Iterable < T >>包含所有元素的單個Iterable。我想知道是否有一個更清潔/更簡單/更快的解決方案,我想出了。簡單的方法將PCollection <T>合併到PCollection中<Iterable<T>>

第一種方法是創建一個虛擬鍵,執行GroupByKey,然後獲取值。

PCollection<MyType> myData; 
// AddDummyKey() outputs KV.of(1, context.element()) for everything 
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey())); 
// Group by dummy key 
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create()); 
// Extract values 
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create() 

第二種方法遵循這裏的建議:How do I make View's asList() sortable in Google Dataflow SDK?但沒有排序。我創建了一個View.asList(),創建了一個虛擬PCollection,然後在視圖的虛擬PCollection上應用一個ParDo函數作爲側面輸入,並簡單地返回該視圖。

PCollection<MyType> myData; 
// Create view of the PCollection as a list 
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList()); 
// Create dummy PCollection 
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1)); 
// Apply dummy ParDo that returns the view 
PCollection<List<MyType>> myDataList = dummy.apply(
     ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() { 
      @Override 
      public void processElement(ProcessContext c) { 
       c.output(c.sideInput(myDataView)); 
      } 
     })); 

似乎有一個預定義的組合函數用於此任務,但我找不到一個。謝謝您的幫助!

回答

1

如果你知道你需要整件事,那麼你的兩種方法都是合理的。兩者都已在Dataflow SDK中使用,後來在成爲Apache Beam SDK時使用。

  1. 側輸入然後輸出整個事情:事實上,這是如何工作的DataflowAssert。在Beam中,不同的後端跑步者可能會以不同的方式實現側面輸入,因此它應該更喜歡View.asIterable(),因爲它具有更少的假設,並且可以允許更大的側面輸入。
  2. 由一個單一的組鍵,然後放下鍵:這是如何梁的繼任者PAssert的作品。它完成同樣的事情,需要更多的關注空集合,但更多的梁運動員比側輸入支持(尤其是當他們是新的,仍在開發中)有更好的支持GroupByKey

因此,View.asIterable()基本上是爲了正是你所要求的。還有一些GroupGlobally轉換請求做第二個版本;這可能發生在某個時刻。

相關問題