2016-12-29 59 views
4

我做了新的安裝Apache Kafka 0.10.1.0。瞭解消費者組ID

我能夠在命令提示符下發送/接收消息。

在使用生產者/消費者Java示例時,我無法知道消費者示例中的group.id參數。

讓我知道如何解決這個問題。

下面是方示例我用了:

public static void main(String[] args) { 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "my-topic"); 
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000"); 
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
      try { 
       consumer.subscribe(Arrays.asList("my-topic")); 

        ConsumerRecords<String, String> records = consumer.poll(100); 
        System.err.println("records size=>"+records.count()); 
        for (ConsumerRecord<String, String> record : records) 
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 



       } 
      catch (Exception ex){ 
       ex.printStackTrace(); 
      } 
      finally { 
       consumer.close(); 
      } 
     } 

消費運行命令後,我可以看到張貼的生產者消息(控制檯)。但無法看到從Java程序中的消息

斌\ WINDOWS \卡夫卡控制檯consumer.bat --bootstrap-服務器localhost:9092 --topic我的話題--from-開始

+0

如果您運行Java消費者並在啓動它後生成一些消息,您仍然看不到任何消息正在被消耗? –

+0

是的,我得到控制檯上的消息「records size => 0」 – Ankit

回答

7

消費者使用消費者組名稱標記自己,並且發佈到主題的每個記錄 被遞送到訂閱消費者組中的每個 內的一個消費者實例。消費者實例可以獨立於 進程或單獨的機器上。

如果所有消費者實例具有相同的消費者組,那麼 記錄將有效地在消費者實例上進行負載均衡。

如果所有消費者實例具有不同的消費者羣體,則每個記錄將被廣播到所有消費者進程。

group.id是一個字符串,用於唯一標識該使用者所屬的消費者進程組。

Kafka intro

+0

但是,我怎麼會知道這個字符串,因爲我發送郵件時沒有提到任何group.id? – Ankit

+0

您可以使用任何字符串,如果您將運行具有相同字符串的兩個使用者,它們將位於同一組中。 –

+0

將group.id放在上面的java程序後,我無法看到消息。但是,我可以在控制檯上看到執行命令(上面提到)的消息。具有諷刺意味的是,沒有group.id需要查看消費者的消息。 – Ankit

0

在你提供你只是等待一次數據超過100ms的代碼。 您應該在循環中接收數據或等待較長時間(在這種情況下,您只能獲得一部分數據)。 至於'group.id',你從控制檯運行消費者的情況下,它會隨機獲得'group.id'。

0

由於沒有提供偏移量,Java客戶端將等待新消息,但不會顯示現有消息 - 這是預期的。如果一個打算已經閱讀所有郵件的主題中可以使用這段代碼:

if (READ_FROM_BEGINNING) { 
    //consume all the messages from the topic from the beginning. 
    //this doesn't work reliably if it consumer.poll(..) is not called first 
    //probably because of lazy-loading issues    
    consumer.poll(10); 
    consumer.seekToBeginning(consumer.assignment()); //if intending to 
    //read from the beginning or call below to read from a predefined offset. 
    //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET); 
} 
0

給任何隨機值,組ID ......沒關係

props.put( 「group.id」,「任意隨機值」);