2016-11-06 60 views
0

我正在使用Flink 1.2-Snapshot。我的數據如下所示:Flink Streaming Windowing - 每個窗口的最後一個事件屬於下一個窗口

  • ID = 25398102,的SourceID = 1,TS = 2016年10月15日0時00分56秒,用戶= 14,值= 919
  • ID = 25398185,的SourceID = 1,ts = 2016-10-15 00:01:06,user = 14,value = 920
  • id = 25398210,sourceId = 1,ts = 2016-10-15 00:01:16,user = 14,值= 944
  • ID = 25398235,的SourceID = 1,TS = 2016年10月15日0時01分24秒,用戶= 3149,值= 944
  • ID = 25398236,的SourceID = 1,TS = 2016-10 -15 00:01:25,user = 71,value = 955
  • id = 25398239,sour ceId = 1,ts = 2016-10-15 00:01:26,user = 71,value = 955
  • id = 25398265,sourceId = 1,ts = 2016-10-15 00:01:36,user = 71,value = 955
  • id = 25398310,sourceId = 1,ts = 2016-10-15 00:02:16,user = 14,value = 960
  • id = 25398320,sourceId = 1,ts = 2016約10-15 0時02分26秒,用戶= 14,值= 1000

我運行下面的代碼來創建基於Windows的用戶ID:

stream.flatMap(new LogsParser()) 
      .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) 
      .keyBy("sourceId") 
      .window(GlobalWindows.create()) 
      .trigger(PurgingTrigger.of(new MySessionTrigger())) 
      .apply(new SessionWindowFunction()) 
      .print(); 

MySession的觸發眺望RECE並檢查用戶標識以在用戶標識更改時觸發窗口。 SessionWindowFunction只是在窗口外創建一個會話。

下面是創建的會話:

  1. 會話:

    • ID = 25398102,的SourceID = 1,TS = 2016年10月15日○點00分五十六秒,用戶= 14,值= 919
    • ID = 25398185,的SourceID = 1,TS = 2016年10月15日0時01分06秒,用戶= 14,值= 920
    • ID = 25398210,的SourceID = 1,TS = 2016-10 -15 00:01:16,user = 14,value = 944
    • ID = 25398235,的SourceID = 1,TS = 2016年10月15日0時01分24秒,用戶= 3149,值= 944
  2. 會話:

    • ID = 25398236,的SourceID = 1,ts = 2016-10-15 00:01:25,user = 71,value = 955
    • id = 25398239,sourceId = 1,ts = 2016-10-15 00:01:26,user = 71,值= 955
    • ID = 25398265,的SourceID = 1,TS = 2016年10月15日0點01分36秒,用戶= 71,值= 955
    • ID = 25398310,的SourceID = 1,TS = 2016-10 -15 00:02:16,用戶= 14,值= 960
  3. 會話:

    • ID = 25398320,的SourceID = 1,TS =二○一六年十月一十五日0時02分26秒,用戶= 14,值= 1000

您可以看到的問題是,在每個會話中,最後一個事件實際上屬於下一個窗口。當最後一個事件已經在窗口中時,觸發該窗口的決定遲遲不了。

如何在不考慮該窗口中的最後一個事件的情況下觸發窗口?

回答

0

一個想法是在用戶標識更改時使用平面地圖將標記插入流中。然後,只要看到其中一個標記,觸發器就會觸發,並且會話窗口功能可以過濾掉標記。

相關問題