2016-09-21 56 views
4

TL; DR:輸入拉平了不兼容的窗口windowFns時CoGroupByKey與CalendarWindows

我怎樣才能CoGroupByKey一組PCollections的與CalendarWindows設置相同的窗口戰略是什麼?

長版

我正在寫一個數據流的管道,從兩個不同的酒吧/潛艇讀操作,PCollections的一個被分成PCollectionTuple最後我嘗試join them in a CoGroupByKey保存它在BigQuery中之前。

在管道的測試我對我的PCollections窗口的策略是:

private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){ 
    return summary 
      .apply("Apply Windows " + OperationName, Window 
        .<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1))) 
        .discardingFiredPanes() 
        .withAllowedLateness(Duration.standardDays(1))) 
      .apply("Count " + OperationName, Count.perKey()); 
} 

我把他們FixedWindow1分鐘長度,以得到結果快。

我的分組是這樣的:

private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){ 
    return KeyedPCollectionTuple.of(Util.AVAIL, Avail) 
           .and(Util.VALUATION_OK, ValuationOK) 
           .and(Util.VALUATION_KO, ValuationKO) 
           .apply("Merge Summary", CoGroupByKey.create()); 
} 

當我在本地測試了和雲這不過運行平穩,爲我設置的窗口與實際生產值,每日1次長度爲CalendarWindows如下:

private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){ 
     return summary 
       .apply("Apply Windows " + OperationName, Window 
           .<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.          
           .discardingFiredPanes() 
           .withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data. 
       .apply("Count " + OperationName, Count.perKey()); 
    } 

然後,我甚至不能編譯代碼,因爲我得到這樣一個消息:

Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.clou[email protected]6af9fcb2, com.google.clou[email protected]6cce16f4 
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121) 
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175) 
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124) 
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290) 
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116) 

閱讀,我發現這個文件上:

當使用CoGroupByKey到應用了窗口策略組PCollections,所有PCollections的要組必須使用相同的窗口戰略和窗口大小。例如,您合併的所有集合必須使用(假設)相同的5分鐘固定窗口或每30秒啓動一次4分鐘的滑動窗口。

如果您的管道嘗試使用CoGroupByKey將PCollections與不兼容的窗口合併,則在構建管道時Dataflow將生成IllegalStateException錯誤。

是否清楚數據流認爲我的PCollections具有不兼容的窗口,但是,所有這些都是使用我之前複製的功能應用的。那麼,我怎樣才能CoGroupByKey與一組使用CalendarWindows設置相同窗口策略的PCollections?

回答

1

看起來像這是CalendarWindows中的一個bug;要解決它,您可以創建一個CalendarWindows對象,並將其用作每個PCollection的WindowFn,而不是爲每個Calendar對象創建單獨的CalendarWindows對象。

+0

它的工作!謝謝! –