2013-02-18 88 views
29

我使用的動物園管理員擺脫卡夫卡的數據。在這裏我總是從最後一個偏移點獲取數據。有什麼方法可以指定偏移時間來獲取舊數據?如何從卡夫卡的舊偏移點獲取數據?

有一個選項autooffset.reset。它接受最小或最大。有人可以解釋什麼是最小和最大的。 autooffset.reset可以幫助從舊偏移點而不是最新偏移點獲取數據嗎?

回答

20

消費者總是屬於一個組,每個分區時,動物園管理員保持跟蹤消費羣的分區中的進展情況。

要從頭開始獲取數據,可以刪除與進程相關聯的所有數據,侯賽因refered

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 

您還可以指定偏移分區的需要,在覈心/ src目錄/主/ Scala的規定/kafka/tools/UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString) 

但是偏移不是時間索引,但你知道每個分區是一個序列。

如果消息包含時間戳(和提防這個時間戳無關卡夫卡收到你的消息的那一刻),你可以嘗試這樣做,試圖通過增加N個偏移量來檢索步驟一個條目的索引,並在某處存儲元組(主題X,第2部分,偏移100,時間戳)。

當您想要從指定時間檢索條目時,可以將二進制搜索應用於粗略索引,直到找到所需的條目並從該條目中提取。

3

請參閱有關kafka配置文檔:http://kafka.apache.org/08/configuration.html以查詢偏移參數的最小值和最大值。

順便說一句,在探索卡夫卡時,我想知道如何重播消費者的所有消息。我的意思是,如果一個消費者組織已經調查了所有的消息,並且希望重新獲得這些消息。

它可以實現的方式是從zookeeper中刪除數據。使用kafka.utils.ZkUtils類來刪除zookeeper上的節點。下面是它的用法:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 
7

從卡夫卡documentation他們說 「kafka.api.OffsetRequest.EarliestTime()查找日誌中的數據的開始,並開始從那裏流,kafka.api.OffsetRequest.LatestTime()將只流新消息。不要認爲偏移量0是開始偏移量,因爲消息隨時間超出日誌。「

使用SimpleConsumerExample這裏:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

類似的問題:Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

這可能有助於

+0

更多細節他們也有一個參考的代碼示例。值得一看 – Hild 2013-07-31 18:07:02

+0

Hild提到的例子是:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example你不能使用'Consumer'例子,你必須使用'SimpleConsumerDemo'示例可以使用偏移量。 – pherris 2014-02-18 04:37:56

1

卡夫卡協議文件是一個偉大的來源與請求/響應/偏移/信息發揮: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 您使用簡單的消費者示例作爲以下代碼演示的狀態:

FetchRequest req = new FetchRequestBuilder() 

     .clientId(clientName) 

     .addFetch(a_topic, a_partition, readOffset, 100000) 

     .build(); 

FetchResponse fetchResponse = simpleConsumer.fetch(req); 

設置readOffset以從中開始初始偏移量。但是您需要檢查最大偏移量以及上面提供的有限偏移量計數,這是根據最後一個參數addFetch方法中的FetchSize計算的。

+0

檢查Kafka的0.9.0.0版本中提供的新API,他們通過組合簡單和高層次的消費者而進一步提高了效率。 – usman 2015-12-22 05:43:55

1

現在

卡夫卡常見問題給出回答這個問題。

如何使用OffsetRequest準確獲得某個時間戳的消息偏移量?

卡夫卡允許通過時間查詢消息的偏移量,並且它在段粒度這樣做。 timestamp參數是unix時間戳,通過時間戳查詢偏移量返回不遲於給定時間戳的消息的最新可能偏移量。有2個時間戳的特殊值 - 最新和最早的。對於任何其他unix時間戳值,Kafka將獲得不遲於給定時間戳創建的日誌段的起始偏移量。由於這一點,並且由於偏移請求僅以分段粒度進行服務,所以對於較大的分段大小,偏移取回請求返回較不準確的結果。

爲了更準確的結果,您可以配置(log.roll.ms),而不是大小(log.segment.bytes)基於時間的日誌段大小。但是應該小心,因爲這樣做可能會由於日誌段滾動頻繁而增加文件處理程序的數量。


未來計劃

卡夫卡將添加時間戳消息格式。請參閱

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

0

你有試過嗎?

斌/ kafka-console-consumer.sh --bootstrap-服務器localhost:9092 --topic測試--from-開始

這將打印出所有的消息在給定的主題, 「測試」 中這個例子。

從這個鏈接https://kafka.apache.org/quickstart