2016-11-04 93 views
2

在鍵控流上,我希望在新事件到達時立即爲每個新傳入事件計算一次窗口函數,並在過去30天內爲其提供此密鑰的所有早期事件的上下文,一個迭代器。Flink Streaming:如何處理每個事件與過去30天的所有事件?

預期的行爲類似於30天長度和1納秒幻燈片的滑動窗口,每傳入事件只計算一次窗口函數。

我看不出如何映射在內置翻滾/滑動/會話窗口與/之上沒有觸發器/逐出器這種行爲等

任何人可以幫助?或者這是否需要編寫我自己的Window Assigner或我自己的鍵控狀態處理?

回答

3

你說得對,用Flink提供的窗口原語來模擬你的用例並不容易。

我能想到的最佳解決方案是實現自定義運算符(OneInputStreamOperator)。這是一個相當低級的界面,可以訪問記錄時間戳,水印和狀態(許多Flink的內置操作符都基於該界面)。當接收到新記錄時,操作員會將其放入優先級隊列中,該隊列按時間戳排序,移除30天前的所有元素,並對隊列中其餘元素評估該功能。

請注意,應將隊列註冊爲受管狀態以使操作員容錯。如果您想使用事件時間,則只能在收到水印時執行評估和丟棄數據。

當實現OneInputStreamOperator接口時,可能有助於查看Flink的內置運算符,例如StreamFilter或其中一個更復雜的運算符。

可以使用transform()方法將定製操作員應用於DataStreamKeyedStream(由DataStream.keyBy()獲得)。

+0

謝謝你指點我正確的方向。添加強大的自定義轉換非常簡單,給我留下了深刻的印象。下面是我如何實現解決方案的一些細節(現在跳過託管狀態的事情):使用OneInputStreamOperator擴展AbstractStreamOperator(請參閱StreamFilter for example方法實現),然後通過transform將您的自定義類實例應用到流(... )。順便說一句:這個用例在例如行爲分析,根據客戶id進行分組,並且希望根據歷史記錄立即對每個新事件做出反應。 –

+0

感謝您的評論。我將我的回答擴展到了'transform()'方法和'StreamFilter'類。 –

+0

@cubiclettuce請分享示例代碼,如果你已經解決了這個問題。我必須完成相同的任務。提前致謝。 –