2017-09-04 68 views
0

根據Spring Kafka文檔設置卡夫卡ConsumerFactory。 但是,groupId似乎沒有被使用。也許我只是把整件事情弄錯了,所以我想讓你知道我的經歷。卡夫卡消費者未加入自定義組標識

這是我的配置似乎並沒有工作:

@Bean 
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(
      getConsumerProperties(), 
      new StringDeserializer(), 
      new JsonDeserializer<>(KafkaEvent.class)); 
} 

Map<String, Object> getConsumerProperties() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 


    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); 
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000); 

    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000); 

    return props; 
} 

我有配置這樣的@KafkaEventListener,無需再次明確指定的groupId:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC) 
public class KafkaEventListener { 

    @Autowired 
    private ConsumerFactory<String, KafkaEvent> consumerFactory; 

    @KafkaHandler 
    public void listenTo(@Payload KafkaEvent event) { 
     LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString()); 
    } 

} 

我也能看到我的groupId「myGroupId」包含在上面記錄的錯誤日誌中。然而,令我懷疑的是一些ConsumerCoordinator的DEBUG日誌記錄,它總是聲明加入一個不同的groupId,我有點擔心這看起來是正確的。

2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator    - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40 
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator    - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40 
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator    - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0 
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator    - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0 

同樣在Spring啓動時輸出ConsumerConfig。我可以看到groupId是錯誤的,但其他屬性被正確接管。

據我瞭解,我可以通過在ConsumerFactory上設置groupId或使用spring.kafka.consumer.group-id在application.properties中設置它來設置全局groupId。儘管兩種變體都不起作用。

只有當我配置使用的groupId @KafkaListener註釋的LOG指出,消費者加入了正確的組:

2017-09-04 15:38:30.787 ( ) DEBUG consumer.internals.AbstractCoordinator    - Received successful JoinGroup response for group myGroupId: [email protected] 

有了這個配置:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId") 

我們正在使用Spring 2.0.0啓動.M3(因此,Spring Kafka 2.0.0.M3)

回答

1

這是M3中的一個bug; fixed on master(2.0.3.BUILD-SNAPSHOT)(和在1.3.0.M2中)。我們期待本週晚些時候發佈2.0.0.RC1候選版本(等待Spring Framework RC4)。