我想禁用kafka SimpleConsumer的自動提交。我正在使用0.8.1版本。對於高級消費者,可以通過consumerConfig來設置和傳遞配置選項,如下所示 kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);如何禁用SimpleConsumer的自動提交kafka 0.8.1
如何爲SimpleConsumer實現相同?我主要想禁用自動提交。我嘗試在consumer.properties中將auto commit設置爲false,並重新啓動kafka服務器,zookeeper和producer。但是,這是行不通的。我想我需要通過代碼應用此設置,而不是在consumer.properties中。 任何人都可以在這裏幫忙嗎?
這裏是我的代碼看起來像
List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId, clientName));
Map<TopicAndPartition, OffsetMetadataAndError> offsets = offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 100000) .build();
long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);
//Consume messages from fetchResponse
Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new HashMap<> ();
requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName));
如果上面的代碼中的崩潰犯偏移之前,我仍然得到最新作在下次運行offsets.get(topicAndPartition).offset(),它使我的結果偏移認爲在執行代碼時發生偏移的自動提交。
我在上面添加了代碼片段。如果我的應用程序在提交偏移之前崩潰,我仍然會在下一次運行中獲得最新的偏移量。所以,我認爲自動提交在內部發生,如果它沒有明確禁用。 – Vikram
這是因爲配置'auto.offset.reset'默認設置爲最大,這意味着如果沒有初始偏移量存儲在Zookeeper上(版本0總是存儲到zookeeper),則消費者將其偏移重置爲最大偏移量 – amethystic
即使有多次運行當提交抵消成功較早時,我看到相同的行爲。不要commitOffset寫入zookeeper? – Vikram