2016-12-14 57 views
2

我有一個簡單的DataFlow java作業,它從.csv文件中讀取幾行代碼。每行包含一個數字單元格,表示在該行上必須執行某個功能的步驟數。在Google DataFlow管道中創建並行For循環的正確方法

我不想在函數中使用傳統的For循環執行該操作,以防這些數字變得非常大。使用並行友好的DataFlow方法做到這一點的正確方法是什麼?

下面是當前Java代碼:

public class SimpleJob{ 

    static class MyDoFn extends DoFn<String, Integer> { 

     public void processElement(ProcessContext c) { 
      String name = c.element().split("\\,")[0]; 
      int val = Integer.valueOf(c.element().split("\\,")[1]); 
      for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF? 
       System.out.println("Processing some function: " + name); // <- do something 
      c.output(val); 
     } 

    } 

    public static void main() { 

     DataflowPipelineOptions options = PipelineOptionsFactory 
       .as(DataflowPipelineOptions.class); 
     options.setProject(DEF.ID_PROJ); 
     options.setStagingLocation(DEF.ID_STG_LOC); 
     options.setRunner(DirectPipelineRunner.class); 

     Pipeline pipeline = Pipeline.create(options); 

     pipeline.apply(TextIO.Read.from("Source.csv")) 
       .apply(ParDo.of(new MyDoFn())); 

     pipeline.run(); 
    } 
} 

這就是「source.csv」的模樣(所以每個數字代表我想多少次運行在該線路上並行功能):

喬,3
瑪麗,4
彼得,2

回答

3

奇怪的是,這是Splittable DoFn的激勵用例之一!該API目前正在大量開發中。

然而,直到API是可用的,你基本上可以模仿的大部分東西它會爲你做:

class ElementAndRepeats { String element; int numRepeats; } 
PCollection<String> lines = p.apply(TextIO.Read....) 
PCollection<ElementAndRepeats> elementAndNumRepeats = lines.apply(
    ParDo.of(...parse number of repetitions from the line...)); 
PCollection<ElementAndRepeats> elementAndNumSubRepeats = elementAndNumRepeats 
    .apply(ParDo.of(
     ...split large numbers of repetitions into smaller numbers...)) 
    .apply(...fusion break...); 
elementAndNumSubRepeats.apply(ParDo.of(...execute the repetitions...)) 

其中:

  • 「分裂大量的重複的」是DoFn,例如,將ElementAndRepeats{"foo", 34}分割爲{ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 4}}
  • 融合突破 - 參見here,以防止多個ParDo被融合在一起,從而破壞並行化
+0

按照您的建議嘗試使用KV 而不是自定義的ElementsAndRepeats。工作得很好。還有兩個問題:(1)合適的微循環的數量是多少?你正在展示10,但是有什麼最佳做法?想象一下原始數字是幾十萬甚至幾百萬的數量? (2)你有沒有關於如何做融合突破的例子?我閱讀了你的鏈接,但從中看不太清楚。我在這裏發佈了相關問題的更新完整代碼:[link](http://stackoverflow.com/questions/41091713/sharing-bigtable-connection-object-among-dataflow-dofn-sub-classes) –

+0

1 - 它並不重要。 「1」會太低,因爲每個ElementAndRepeats會有一些開銷; 1M會太高,因爲你不會得到額外的並行化。幾十到幾千的數量可能會給你幾乎相同的性能。 2 - 一種方法是:https://github.com/apache/incubator-beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io /jdbc/JdbcIO.java#L307 – jkff