2017-07-13 85 views
0

提供了一個有n個分區的主題。 Spring的KafkaConsumer偵聽器可以通過哪種方式一次偵聽來自同一個/一個分區的多個消息?單個Spring的KafkaConsumer偵聽器可以偵聽來自同一個/一個分區的多條消息嗎?

我試過ConcurrentKafkaListenerContainerFactory通過設置setBatchListener(true);但消費者已經開始消費來自不同分區而不是一個分區的多個消息。

public class BatchReceiverConfig { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 

     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     // maximum records per batch receive 
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(2); 
     // enable batch listeners 
//  factory.setBatchListener(true); 

     return factory; 
    } 

    @Bean 
    public BatchReceiver receiver() { 
     return new BatchReceiver(); 
    } 
} 


/* Listener */ 
@KafkaListener(id = "batch-listener", topics = TOPIC_TEST_BATCH) 
    public void receive(List<String> data, 
         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, 
         @Header(KafkaHeaders.OFFSET) List<Long> offsets) { 

     LOGGER.info("start of batch receive"); 
     for (int i = 0; i < data.size(); i++) { 
      LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i), 
        partitions.get(i) + "-" + offsets.get(i)); 
      // handle message 

      latch.countDown(); 
     } 
     LOGGER.info("end of batch receive"); 
    } 
+0

重組你的問題:「怎麼能一個偵聽器只從一個分區消耗的消息?」。那是對的嗎? – Arek

+0

這是正確的,但在春季卡夫卡模塊的上下文中。 – Rites

回答

0

只要將只有一個組中的用戶,那麼你可以設置該ConcurrentKafkaListenernconcurrencyLevel

+0

它沒有工作,我相信'max.poll.records'是決定提取記錄數量的屬性。 – Rites

+0

演示如何配置Spring Kafka和Kafka消費者本身。 – Arek

+0

本身已添加片段。 – Rites

0

如果使用組管理,卡夫卡將分配分區的消費者。每個使用者(或使用併發時的線程)將獲得0個,1個或多個分區,具體取決於有多少個使用者和分區。

如果你想要一個特定的消費者從只有一個分區得到的消息,你必須指定分區的自己,而不是讓卡夫卡做作業。

如果使用併發容器;您將不得不分配與併發相同數量的分區,因此每個線程只能獲得一個分區。

+0

在問題中發佈的代碼片段中,分區數量=併發性,即使它不起作用。另外,消費者數量=分區數量。 – Rites

+0

如果您正在使用組管理,分區分佈將需要時間;最初,消費者將獲得他們全部;唯一的保證是自己分配分區,而不是使用組管理。 'props.put(ConsumerConfig.GROUP_ID_CONFIG,「batch4」);' –

+0

在集羣環境中,很難將其定義爲相同的代碼部署到所有節點。有沒有辦法動態配置它? – Rites

相關問題