2016-07-31 45 views
4

縱觀最新版本(v0.10)卡夫卡消費者documentation我可以檢索卡夫卡分區的最新可用偏移量,而無需檢索所有消息嗎?

「消費者的位置給將要給出了下一記錄的偏移量,它會比最高的偏移較大的一個消費者已經看到該分區,每當消費者接收數據呼叫輪詢(長)並且接收消息時,它就自動地前進。「

有沒有辦法查詢服務器端分區可用的最大偏移量,沒有檢索所有消息?

我試圖實現的邏輯如下:

  1. 查詢的每個第二未決的消息量(A)一個題目中的
  2. 如果A>閾值時,喚醒一個處理器,將繼續檢索的所有郵件,並處理它們
  3. 否則什麼也不做(睡眠1)

的動機是,我需要做一些批量處理,但我想喚醒處理器ü只有當有足夠的數據時(並且我不想檢索所有數據兩次)。

回答

4

可以使用Consumer.seekToEnd()方法,運行Consumer.poll(0)作出生效的,但立即返回,然後Consumer.position()找到所有認購(或指定)的主題分區的位置。這些將是所有分區的當前最終抵消額。這也將開始從這些經紀人那裏獲取一些數據以獲得這些偏移量,但是如果您隨後找回不同的位置,則任何返回的數據都將被忽略。

目前,serejja提到的另一種方法是使用舊的簡單使用者,但該過程比較複雜,因爲您需要手動爲每個分區找到組長。

+0

謝謝。我想知道是否可以避免兩次讀取所有數據(在上面描述的場景中)。例如,我可以將max.partition.fetch.bytes減少到非常小的值,以消除* poll(0)*的「副作用」*檢索實際數據? –

+0

你不需要調用poll()。 seekToEnd()是一個異步調用,您可以使用poll()或position()強制完成。使用seek ...()和position()不會讀取任何消息,只是少量的元數據 –

+0

@ChrisGerken如果您正在使用消費者組但尚未分配任務,仔細研究代碼,但看起來它會拋出'IllegalArgumentException')。對於手動分配的主題/主題分區,似乎可以正常工作。 –

0

不幸的是,我不明白0.10消費者可能會怎樣。但是,如果您有任何較低級別的Kafka客戶端(對不起,但我不確定是否存在JVM,但其他​​語言有很多),這是可行的。因此,如果你有一些時間和靈感來實現這一點,這裏的路要走 - 每FetchResponse(這是每個「給我留言」請求的響應)包含一個名爲HighwaterMarkOffset的字段,它基本上是在分區結束(https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse)。這裏的訣竅是發送一個FetchRequest,它會立即返回(例如不會阻止等待),只有HighwaterMarkOffset。

要做到這一點你FetchRequest應該有:

  1. MaxWaitTime設置爲0,這將意味着「立即返回,如果不能獲取至少MinBytes字節」。
  2. MinBytes設置爲0,意思是「如果你給我一個空的答覆,我很好」。
  3. FetchOffset在這種情況下並不重要,如果我沒有錯,它甚至可能是一個無效的偏移量,但最好是一個有效的偏移量。
  4. MaxBytes設置爲0,這意味着「給我不多於0字節的數據」,例如,沒有。

這樣這個請求將立即返回,沒有數據,但仍然將highwatermark offset設置爲合適的值。一旦你有了高水位偏移量,你可以將它與你當前的偏移量進行比較,並計算出你的背後有多少。

希望這會有所幫助。

+0

謝謝,@serejja!這肯定給了一個方向來進一步探索..任何想法如何使用內部[Fetcher類]的想法/代碼(https://github.com/apache/kafka/blob/trunk/clients/src/main/ java/org/apache/kafka/clients/consumer/internals/Fetcher.java)來實現這個目標? listOffset或內部sendListOffsetRequest方法看起來很有前途。 –

相關問題