2016-04-26 45 views
1

我想加入使用Apache弗林克流API,但沒有兩個流被加入,我不知道閱讀文檔後,我做了什麼錯兩個流的加入不工作

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector). 
      window(GlobalWindows.create()).apply(joinFunction); 
    joined.print(); 
    env.execute("Window"); 

主要功能是根本myPojo.getFirst()

回答

2

GlobalWindows窗口永不會觸發,除非您指定自定義Trigger。在你的例子中,如果你使用類似TumblingEventTimeWindows.of(Time.seconds(5))的東西,你應該看到結果。

+0

因此,有沒有什麼辦法讓兩個流加入他們的完整歷史,使用流API?我不想使用批API,因爲我希望儘快獲得第一個結果,但也許有一些參數用於調整批API。 – Artur

+0

加入他們的完整歷史是不可能的,因爲他們是(技術上)無限的流。您可以指定一個以給定時間間隔定期觸發的觸發器。這樣你就可以加入目前的狀況。例如,你可以使用'ContinuousProcessingTimeTrigger.of(Time.minutes(40))'。如果您想要在觸發時刪除內容,您還可以將其包含在「PurgingTrigger」中。 – aljoscha