2016-09-30 90 views
2

我試圖在Flink中創建我的第一個實時分析作業。這種方法就像kappa-architecture-like一樣,所以我在Kafka上有我的原始數據,我們收到任何實體狀態變化的消息。使用Flink計算流狀態實體的最新狀態

,以便郵件的形式爲:

(id,newStatus, timestamp) 

我們要計算,爲每一個時間窗口,在給定的狀態的項目數。所以輸出應該是這樣的形式:

(outputTimestamp, state1:count1,state2:count2 ...) 

或等同物。在任何給定時間,這些行應包含處於給定狀態的項目的計數,其中與Id關聯的狀態是針對該ID觀察到的最近消息。在任何情況下都應該計算一個id的狀態,即使這個事件比處理的時間要早​​。所以所有計數的總和應該等於系統中觀察到的不同ID的數量。以下步驟可能會在一段時間後忘記最後一個項目中的項目,但目前這不是一個嚴格的要求。

這將寫入elasticsearch然後查詢。

我嘗試了很多不同的路徑,沒有一個完全滿足要求。使用滑動窗口,我可以很容易地實現預期的行爲,除了當滑動窗口的開始超過事件的時間戳時,正如您所期望的那樣,它丟失了數量。其他方法在處理積壓時未能保持一致,因爲我在鍵數據和時間戳方面做了一些技巧,這些技巧在數據一次處理完成時失敗。

所以我想知道,即使在高層次,我該如何解決這個問題。它看起來像是一個相對常見的用例,但事實上,給定ID的相關信息必須無限期地保留以正確計數實體會產生很多問題。

回答

3

我想我對你的問題的解決方案:鑑於(id, state, time)一個DataStream作爲

val stateUpdates: DataStream[(Long, Int, ts)] = ??? 

您獲得實際狀態變化如下:

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate) 
    .keyBy(_._1) // key by id 
    .flatMap(new StateUpdater) 

StateUpdater是有狀態的FlatMapFunction。它有一個鍵入狀態,用於存儲每個ID的最後一個狀態。對於每個輸入記錄,它將返回兩個狀態計數更新記錄:(oldState, -1)(newState, +1)。記錄(oldState, -1)可確保減少先前狀態的計數。

下一頁您彙總每個州和窗口狀態計數的變化:

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time) 
    .keyBy(_._1) // key by state 
    .timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling 
    .apply(new SumReducer(), new YourWindowFunction()) 

SumReducer和的cntUpdates和YourWindowFunction分配你的窗口的時間戳。此步驟彙總了窗口中每個狀態的所有狀態更改。

最後,我們用計數更新調整當前計數。

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time) 
    .keyBy(_._1) // key by state again 
    .map(new CountUpdater) 

CountUpdater是一個有狀態MapFunction。它有一個鍵控狀態,用於存儲每個狀態的當前計數。對於每個輸入記錄,調整計數併發出記錄(state, newCount, time)

現在你有一個每個狀態都有一個新計數的流(每個狀態一個記錄)。如果可能,您可以使用這些記錄更新您的Elasticsearch索引。如果您需要收集特定時間的所有狀態計數,則可以使用窗口來完成。

請注意:此程序的狀態大小取決於唯一ID的數量。這可能會導致問題,如果id空間增長非常快。

+0

我正在處理您的建議,非常感謝。我在這裏錯過的是'YourWindowFunction'應該做的事情。我沒有活動時間的概念,所以我不能指定時間戳。此外,這個解決方案似乎正在處理時間,而我關心事件時間。我不能讓它運行,但爲了我所得到的,這與我所需要的稍有不同。 – Chobeat

+0

這也適用於事件時間。您需要在exec env上設置正確的'TimeCharacteristics'並指定時間戳+水印。唯一的時間依賴操作是窗口。 'YourWindowFunction'指定窗口的時間戳。 'WindowFunction.apply()'有一個'TimeWindow'參數,可以訪問窗口的開始和結束時間。請參閱[文檔](https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation)。 –

+0

'TimeCharacteristics'已設置,但我不知道如何爲此佈局分配時間戳?我應該隨時更新時間戳嗎?像(狀態,計數,時間戳)? – Chobeat