2016-12-06 61 views
1

我想禁用kafka SimpleConsumer的自動提交。我正在使用0.8.1版本。對於高級消費者,可以通過consumerConfig來設置和傳遞配置選項,如下所示 kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);如何禁用SimpleConsumer的自動提交kafka 0.8.1

如何爲SimpleConsumer實現相同?我主要想禁用自動提交。我嘗試在consumer.properties中將auto commit設置爲false,並重新啓動kafka服務器,zookeeper和producer。但是,這是行不通的。我想我需要通過代碼應用此設置,而不是在consumer.properties中。 任何人都可以在這裏幫忙嗎?

這裏是我的代碼看起來像

List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
    
topicAndPartitionList.add(topicAndPartition);
 
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new  OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId, clientName)); 


Map<TopicAndPartition, OffsetMetadataAndError> offsets =  offsetFetchResponse.offsets();


 
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
    .addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 100000) .build(); 

long readOffset = offsets.get(topicAndPartition).offset();

 
FetchResponse fetchResponse = consumer.fetch(req); 

//Consume messages from fetchResponse 


Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new HashMap<> ();
 
requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0)); 
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new   OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName));
 

如果上面的代碼中的崩潰犯偏移之前,我仍然得到最新作在下次運行offsets.get(topicAndPartition).offset(),它使我的結果偏移認爲在執行代碼時發生偏移的自動提交。

回答

1

使用SimpleConsumer只是意味着您要關注包括偏移提交在內的所有消息消息,因此低級API不支持自動提交。

+0

我在上面添加了代碼片段。如果我的應用程序在提交偏移之前崩潰,我仍然會在下一次運行中獲得最新的偏移量。所以,我認爲自動提交在內部發生,如果它沒有明確禁用。 – Vikram

+0

這是因爲配置'auto.offset.reset'默認設置爲最大,這意味着如果沒有初始偏移量存儲在Zookeeper上(版本0總是存儲到zookeeper),則消費者將其偏移重置爲最大偏移量 – amethystic

+0

即使有多次運行當提交抵消成功較早時,我看到相同的行爲。不要commitOffset寫入zookeeper? – Vikram