TL; DR:輸入拉平了不兼容的窗口windowFns時CoGroupByKey與CalendarWindows
我怎樣才能CoGroupByKey一組PCollections的與CalendarWindows設置相同的窗口戰略是什麼?
長版
我正在寫一個數據流的管道,從兩個不同的酒吧/潛艇讀操作,PCollections的一個被分成PCollectionTuple最後我嘗試join them in a CoGroupByKey保存它在BigQuery中之前。
在管道的測試我對我的PCollections窗口的策略是:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply("Count " + OperationName, Count.perKey());
}
我把他們FixedWindow1分鐘長度,以得到結果快。
我的分組是這樣的:
private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
.and(Util.VALUATION_OK, ValuationOK)
.and(Util.VALUATION_KO, ValuationKO)
.apply("Merge Summary", CoGroupByKey.create());
}
當我在本地測試了和雲這不過運行平穩,爲我設置的窗口與實際生產值,每日1次長度爲CalendarWindows如下:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
.apply("Count " + OperationName, Count.perKey());
}
然後,我甚至不能編譯代碼,因爲我得到這樣一個消息:
Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.clou[email protected]6af9fcb2, com.google.clou[email protected]6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)
閱讀,我發現這個文件上:
當使用CoGroupByKey到應用了窗口策略組PCollections,所有PCollections的要組必須使用相同的窗口戰略和窗口大小。例如,您合併的所有集合必須使用(假設)相同的5分鐘固定窗口或每30秒啓動一次4分鐘的滑動窗口。
如果您的管道嘗試使用CoGroupByKey將PCollections與不兼容的窗口合併,則在構建管道時Dataflow將生成IllegalStateException錯誤。
是否清楚數據流認爲我的PCollections具有不兼容的窗口,但是,所有這些都是使用我之前複製的功能應用的。那麼,我怎樣才能CoGroupByKey與一組使用CalendarWindows設置相同窗口策略的PCollections?
它的工作!謝謝! –