2016-11-08 111 views
2

我正在使用Kafka處理日誌事件。我對Kafka Connect和Kafka Streams有簡單連接器和流轉換的基本知識。Kafka將單個日誌事件行聚合到組合日誌事件中

現在我有具有以下結構的日誌文件:

timestamp event_id event 

日誌事件具有由event_id的連接的多個日誌行(例如,郵件日誌)

實施例:

1234 1 START 
1235 1 INFO1 
1236 1 INFO2 
1237 1 END 

而且一般有多個事件:

Examp勒:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

的時間窗口(開始和結束之間)可達到5分鐘。

至於結果我要像

event_id combined_log 

例的話題:

1 START,INFO1,INFO2,END 
2 START,INFO2,END 

什麼是實現這一目標的正確的工具?我試圖用卡夫卡流解決它,但我可以弄清楚如何...

回答

2

在你的用例中,你基本上是基於消息有效載荷來重建會話或事務。目前沒有內置的,即時可用的支持這種功能。但是,您可以使用Kafka的Streams API的Processor API部分來自行實現此功能。您可以編寫自定義處理器,使用狀態存儲區來跟蹤給定密鑰何時開始,添加和結束會話/事務。

郵件列表中的一些用戶已經在做IIRC,但我不知道現有的代碼示例,我可以指出您。

您需要注意的是正確處理亂序數據。在你上面的例子,你列出了正確的順序所有輸入數據:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

但在實踐中,信息/記錄可能到達了序,像這樣(我只顯示與關鍵1信息,以簡化的例子) :

1234 1 START 
1237 1 END 
1236 1 INFO2 
1235 1 INFO1 

即使出現這種情況,據我所知,在您的使用情況下,你仍然要將此數據解釋爲:START -> INFO1 -> INFO2 -> END而非START -> END(忽略/丟棄INFO1INFO2 =數據丟失​​)或START -> END -> INFO2 -> INFO1(順序不正確,可能也違反了你的語義限制)。

+0

感謝您的回答,我會看看Processor API。是的,訂單問題也應該考慮。 – imehl

+1

處理器API是解決方案 - 再次感謝! – imehl

+0

@imehl:或許您想更新您的問題,並提供一些信息,說明您最終做了什麼來解決您的問題,現在您找到了解決方案? –