我使用的彈簧卡夫卡爲消耗LinkedIn large message supported Kafka client春天卡夫卡不支持大消息的消費者
生產的消息給出,如它的構造,這卡夫卡客戶始終覆蓋AUTO_OFFSET_RESET_CONFIG
無法比擬的。
private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
}
Map<String, Object> configForVanillaConsumer() {
Map<String, Object> newConfigs = new HashMap<>();
newConfigs.putAll(this.originals());
newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return newConfigs;
}
所以一旦我開始使用批提交併設置ENABLE_AUTO_COMMIT_CONFIG
爲false,它引發以下錯誤:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer failed on partition assignment org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)
此問題的發生,因爲這是第一次爲這個消費羣體的消費信息從這個話題,所以它試圖使用偏移重置策略。
雖然我將它設置爲「最早」,它被認爲是由底層LinkedIn卡夫卡客戶端覆蓋爲「無」
我想也覆蓋ConsumerRebalanceListener手動設法開始在這種情況下,但實際上它沒有涉及到這一點。
我該如何解決這個問題?
感謝加里,我開了一個[Github上的問題(https://github.com/spring-projects/spring-kafka/issues/359) 我用消費者的立場,並尋求方法來初始化爲消費者偏移第一次從主題分區消耗。 –