2017-07-03 201 views
0

我使用的彈簧卡夫卡爲消耗LinkedIn large message supported Kafka client春天卡夫卡不支持大消息的消費者

生產的消息給出,如它的構造,這卡夫卡客戶始終覆蓋AUTO_OFFSET_RESET_CONFIG無法比擬的。

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs, 
    Deserializer<K> keyDeserializer, 
    Deserializer<V> valueDeserializer, 
    Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer, 
    Auditor<K, V> consumerAuditor) { 
     _kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(), 
     byteArrayDeserializer, 
     byteArrayDeserializer); 
    } 
Map<String, Object> configForVanillaConsumer() { 
    Map<String, Object> newConfigs = new HashMap<>(); 
    newConfigs.putAll(this.originals()); 
    newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); 
    return newConfigs; 
} 

所以一旦我開始使用批提交併設置ENABLE_AUTO_COMMIT_CONFIG爲false,它引發以下錯誤:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer failed on partition assignment org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)

此問題的發生,因爲這是第一次爲這個消費羣體的消費信息從這個話題,所以它試圖使用偏移重置策略。

雖然我將它設置爲「最早」,它被認爲是由底層LinkedIn卡夫卡客戶端覆蓋爲「無」

我想也覆蓋ConsumerRebalanceListener手動設法開始在這種情況下,但實際上它沒有涉及到這一點。

我該如何解決這個問題?

回答

0

有趣;請在GitHub中打開一個問題。

如果政策是none,我們應該捕獲該異常。

與此同時,您可能只需使用普通客戶端一次即可解決該問題,以便爲該組實際設置初始偏移量(您不必實際接收任何消息,只需獲取分配分區並設置組的初始位置)。

+0

感謝加里,我開了一個[Github上的問題(https://github.com/spring-projects/spring-kafka/issues/359) 我用消費者的立場,並尋求方法來初始化爲消費者偏移第一次從主題分區消耗。 –