2
我有一個簡單的DataFlow java作業,它從.csv文件中讀取幾行代碼。每行包含一個數字單元格,表示在該行上必須執行某個功能的步驟數。在Google DataFlow管道中創建並行For循環的正確方法
我不想在函數中使用傳統的For循環執行該操作,以防這些數字變得非常大。使用並行友好的DataFlow方法做到這一點的正確方法是什麼?
下面是當前Java代碼:
public class SimpleJob{
static class MyDoFn extends DoFn<String, Integer> {
public void processElement(ProcessContext c) {
String name = c.element().split("\\,")[0];
int val = Integer.valueOf(c.element().split("\\,")[1]);
for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF?
System.out.println("Processing some function: " + name); // <- do something
c.output(val);
}
}
public static void main() {
DataflowPipelineOptions options = PipelineOptionsFactory
.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
options.setRunner(DirectPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from("Source.csv"))
.apply(ParDo.of(new MyDoFn()));
pipeline.run();
}
}
這就是「source.csv」的模樣(所以每個數字代表我想多少次運行在該線路上並行功能):
喬,3
瑪麗,4
彼得,2
按照您的建議嘗試使用KV而不是自定義的ElementsAndRepeats。工作得很好。還有兩個問題:(1)合適的微循環的數量是多少?你正在展示10,但是有什麼最佳做法?想象一下原始數字是幾十萬甚至幾百萬的數量? (2)你有沒有關於如何做融合突破的例子?我閱讀了你的鏈接,但從中看不太清楚。我在這裏發佈了相關問題的更新完整代碼:[link](http://stackoverflow.com/questions/41091713/sharing-bigtable-connection-object-among-dataflow-dofn-sub-classes) –
1 - 它並不重要。 「1」會太低,因爲每個ElementAndRepeats會有一些開銷; 1M會太高,因爲你不會得到額外的並行化。幾十到幾千的數量可能會給你幾乎相同的性能。 2 - 一種方法是:https://github.com/apache/incubator-beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io /jdbc/JdbcIO.java#L307 – jkff