2016-07-15 83 views
1

我試圖使用OffsetCommitRequest其中包含在以下包卡夫卡消費者API 0.9版本: org.apache.kafka.common.requests.OffsetCommitRequest如何向卡夫卡的消費者發送OffsetCommitRequest?

如何發送這個要求嗎?什麼是使用此的理想方式? 我想在卡夫卡本身提交補償。我沒有找到任何與0.9版相關的文檔。它大部分可用於0.8.x

此外,此請求的構造函數需要生成id,成員id和保留時間。這些領域是什麼?

+0

通過這個鏈接https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka – Aby

+0

我已經做到了。這是爲舊版本。它使用BlockingChannel發送0.9 – vamosromil

+0

中不可用的請求。0.9版的Kafka文檔可以在這裏找到:http://kafka.apache.org/090/documentation.html – Aby

回答

3

如果要提交手動偏移,也許你應該設置消費屬性

enable.auto.commit=false

,並使用commitSync()或commitAsync()卡夫卡消費者的方法。 例如,您可以在處理完所有ConsumerRecords後調用commitSync()。 或者你也可以只提交你想要的每個接收到的ConsumerRecord之後的TopicPartition。就像這樣:

Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); 
offsetMap.put(new TopicPartition(someTopic, somePartition), new OffsetAndMetadata(someOffset)); 
kafkaConsumer.commitSync(offsetMap); 
+0

這是我採取的最終解決方案。謝謝反正:) 但我仍然想回答我的原始問題。我認爲這個請求對象有時可以非常方便。 – vamosromil