2017-07-03 48 views
0

按照Apache Beam 2.0.0 SDK DocumentationGroupIntoBatches只能與KV集合。GroupIntoBatches非KV元素

我的數據集只包含值,不需要引入密鑰。然而,要利用GroupIntoBatches我不得不實施一個空字符串作爲關鍵的「假」鍵:

static class FakeKVFn extends DoFn<String, KV<String, String>> { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
    c.output(KV.of("", c.element())); 
    } 
} 

所以整體管道如下所示:

public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 
    Pipeline p = Pipeline.create(options); 

    long batchSize = 100L; 

    p.apply("ReadLines", TextIO.read().from("./input.txt")) 
     .apply("FakeKV", ParDo.of(new FakeKVFn())) 
     .apply(GroupIntoBatches.<String, String>ofSize(batchSize)) 
     .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of()))) 
     .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(callWebService(c.element().getValue())); 
     } 
     })) 
     .apply("WriteResults", TextIO.write().to("./output/")); 

    p.run().waitUntilFinish(); 
} 

有什麼辦法在不引入「假」鍵的情況下分批分組?

回答

2

需要將KV輸入提供給GroupIntoBatches,因爲轉換是使用狀態和定時器(每個按鍵和窗口)實現的。

對於每個鍵+窗口對,狀態和計時器必須連續執行(或可觀察地執行)。你必須通過提供密鑰(和窗口,儘管沒有我知道的在Windows上並行運行的runner)來手動表示可用的並行性。兩種最常見的方法是:

  1. 使用如用戶ID
  2. 一些自然鍵選擇碎片和重點的一些固定數量的隨機。這可能很難調整。你必須有足夠的分片才能獲得足夠的並行性,但每個分片都需要包含足夠的數據,GroupIntoBatches實際上是有用的。

將一個虛擬鍵添加到所有元素中,就像在片段中一樣,將導致轉換不會並行執行。這與Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner的討論類似。