我試圖在Flink中創建我的第一個實時分析作業。這種方法就像kappa-architecture-like一樣,所以我在Kafka上有我的原始數據,我們收到任何實體狀態變化的消息。使用Flink計算流狀態實體的最新狀態
,以便郵件的形式爲:
(id,newStatus, timestamp)
我們要計算,爲每一個時間窗口,在給定的狀態的項目數。所以輸出應該是這樣的形式:
(outputTimestamp, state1:count1,state2:count2 ...)
或等同物。在任何給定時間,這些行應包含處於給定狀態的項目的計數,其中與Id關聯的狀態是針對該ID觀察到的最近消息。在任何情況下都應該計算一個id的狀態,即使這個事件比處理的時間要早。所以所有計數的總和應該等於系統中觀察到的不同ID的數量。以下步驟可能會在一段時間後忘記最後一個項目中的項目,但目前這不是一個嚴格的要求。
這將寫入elasticsearch然後查詢。
我嘗試了很多不同的路徑,沒有一個完全滿足要求。使用滑動窗口,我可以很容易地實現預期的行爲,除了當滑動窗口的開始超過事件的時間戳時,正如您所期望的那樣,它丟失了數量。其他方法在處理積壓時未能保持一致,因爲我在鍵數據和時間戳方面做了一些技巧,這些技巧在數據一次處理完成時失敗。
所以我想知道,即使在高層次,我該如何解決這個問題。它看起來像是一個相對常見的用例,但事實上,給定ID的相關信息必須無限期地保留以正確計數實體會產生很多問題。
我正在處理您的建議,非常感謝。我在這裏錯過的是'YourWindowFunction'應該做的事情。我沒有活動時間的概念,所以我不能指定時間戳。此外,這個解決方案似乎正在處理時間,而我關心事件時間。我不能讓它運行,但爲了我所得到的,這與我所需要的稍有不同。 – Chobeat
這也適用於事件時間。您需要在exec env上設置正確的'TimeCharacteristics'並指定時間戳+水印。唯一的時間依賴操作是窗口。 'YourWindowFunction'指定窗口的時間戳。 'WindowFunction.apply()'有一個'TimeWindow'參數,可以訪問窗口的開始和結束時間。請參閱[文檔](https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation)。 –
'TimeCharacteristics'已設置,但我不知道如何爲此佈局分配時間戳?我應該隨時更新時間戳嗎?像(狀態,計數,時間戳)? – Chobeat