2016-03-15 95 views
2

TL; DR定製水槽攔截:截距()方法調用多次對同一事件

當水槽源未能事務推到在流水線中的下一信道,它始終保持事件實例爲下一次嘗試?

一般情況下,安全有有狀態 Flume攔截器,其中事件的處理取決於以前處理的事件?

全部問題描述:

我正在考慮利用有關方式主題分區消費者分佈在消費羣體在現有的基礎水槽日誌進行流式重複數據刪除阿帕奇卡夫卡提供擔保的可能性整合架構。

使用Kafka Source for Flume和自定義路由到Kafka主題分區,我可以確保應該進入同一邏輯「重複數據刪除隊列」的每個事件將由集羣中的單個Flume代理進行處理(只要因爲集羣內沒有代理停止/啓動)。我有使用定製的水槽攔截以下設置:

[KafkaSource重複數據刪除攔截] - >()MemoryChannel) - > [HDFSSink]

看來,當水槽卡夫卡源轉輪是無法將一批事件推送到內存通道,作爲該批次的一部分的事件實例再次傳遞給我的攔截器的方法intercept()方法。在這種情況下,很容易將標籤(以Flume事件標頭的形式)添加到已處理的事件中,以將實際重複與重新處理的失敗批次中的事件區分開來。

但是,我想知道是否有任何明確的保證,說明失敗事務中的事件實例是爲下一次嘗試而保留的,或者是否有可能從實際源再次讀取事件(在這種情況下,Kafka )並從零重新構建。在這種情況下,我的攔截器會認爲這些事件是重複的並丟棄它們,儘管它們從未被傳送到頻道。

編輯

這是我的攔截器是如何區分一個已經從非處理事件處理的事件實例:

public Event intercept(Event event) { 
    Map<String,String> headers = event.getHeaders(); 
    // tagHeaderName is the name of the header used to tag events, never null 
    if(!tagHeaderName.isEmpty()) { 
    // Don't look further if event was already processed... 
    if(headers.get(tagHeaderName)!=null) 
     return event; 
    // Mark it as processed otherwise... 
    else 
     headers.put(tagHeaderName, ""); 
    } 
    // Continue processing of event... 
} 

回答

0

我遇到了類似的問題:

當接收器寫入失敗,Kafka Source仍保存已由攔截器處理的數據。在下一次嘗試中,這些數據將發送給攔截器,並一次又一次地處理。通過閱讀KafkaSource的代碼,我相信這是bug。

我的攔截器會從原始消息中剝離一些信息,並修改原始消息。由於這個錯誤,重試機制永遠不會按預期工作。

到目前爲止,The是不容易的解決方案。

+0

事實上,當通道事務失敗時,攔截器再次處理相同的Kafka事件本身並不是問題;畢竟,該事件無法交付,因此預計會有重試。我想知道的是,是否有保證JVM中的相同Event **實例**,以及攔截器做出的任何修改將再次使用,或者如果存在可能性(或者在我的情況下,*風險*)該活動將再次從卡夫卡話題中讀取。 – Shadocko

+0

請注意,如果您只是要區分已在攔截器的代碼中已經處理的事件實例,則可以爲事件添加*標記*標頭並跳過使用此*標記*標頭處理事件。 – Shadocko