2016-06-09 120 views
6

如何獲取,設置或重置Kafka Connect連接器/任務/接收器的偏移量?卡夫卡連接偏移。 get/set方法?

我可以使用運行kafka.admin.ConsumerGroupCommand/usr/bin/kafka-consumer-groups工具來查看我所有常規卡夫卡消費者組的偏移量。但是,Kafka Connect任務和組不顯示與此工具。

同樣,我可以使用zookeeper-shell連接到Zookeeper,我可以看到普通卡夫卡消費者組的動物園管理員條目,但對於卡夫卡連接接收器則不能。

+0

作爲(壞)的解決方法,你可以刪除該連接器,並用不同的名稱註冊新的連接。顯然,這隻有在你不必經常這樣做時纔有意義。 – pederpansen

+0

[This](https://stackoverflow.com/questions/45670937/kafka-0-11-how-to-reset-offsets)是如何修改組的偏移量的很好的解釋。 –

回答

7

從0.10.0.0開始,Connect不提供管理偏移量的API。這是我們未來想要改進的東西,但還沒有。 ConsumerGroupCommand將是管理Sink連接器偏移的正確工具。請注意,源連接器偏移量存儲在Connect的特殊偏移量主題中(它們不像通常的Kafka偏移量,因爲它們是由源系統定義的,請參閱worker configuration docs中的offset.storage.topic),並且因爲連接器使用新消費者,在Zookeeper中存儲它們的偏移量 - 所有現代客戶都使用原生的基於Kafka的偏移量存儲。 ConsumerGroupCommand可以使用這些偏移量,您只需要通過--new-consumer選項)。

+1

有任何ETA或計劃嗎?可以看到https://issues.apache.org/jira/browse/KAFKA-4107也沒有進展或細節。 – SemanticBeeng

2

您無法設置偏移量,但可以使用kafka-consumer-groups.sh工具「向前滾動」前饋。

的消費羣的連接器具有的connect-*CONNECTOR NAME*一個名字,但你可以仔細檢查: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

要查看當前的偏移: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

要移動向前偏移: unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

我想你可以通過使用--delete標誌首先刪除使用者組來向後移動偏移量。

不要忘記通過Kafka Connect REST API暫停並恢復您的連接器。

0

在我的情況下(測試讀取文件到生產者和控制檯消費,都在當地只),我剛看到這個生產者輸出:

offset.storage.file.filename=/tmp/connect.offsets 

所以我想打開它,但它是二進制的,與一些幾乎不可識別的字符。

我刪除它(重命名它也可以),然後我可以寫入同一個文件並從消費者那裏獲取文件內容。 您必須重新啓動控制檯生產者才能生效,因爲它試圖讀取偏移文件(如果不存在),請創建一個新文件,以便重置偏移量。

如果不想刪除重置它,你可以使用:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name> 

您可以通過檢查所有組名:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

,並檢查各組的細節:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe 

在生產環境中,此偏移由zookeeper管理,所以更多步驟(和c aution)是必要的。您可以參考這個網頁:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

步驟:

kafka-topics --list --zookeeper localhost:2181 
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest 

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}