2016-06-23 42 views
0

我可以在命令行上發送和接收消息以對抗Kafka位置安裝。我也可以通過Java代碼發送消息。這些消息顯示在Kafka命令提示符中。我也有一個卡夫卡用戶的Java代碼。該代碼昨天收到消息。但是,它今天早上沒有收到任何消息。代碼沒有改變。我想知道屬性配置是否不完全正確。下面是我的配置:卡夫卡消費者 - 接收消息不一致

生產者:

bootstrap.servers - localhost:9092 
group.id - test 
key.serializer - StringSerializer.class.getName() 
value.serializer - StringSerializer.class.getName() 

和ProducerRecord設置爲

ProducerRecord<String, String>("test", "mykey", "myvalue") 

消費者:

zookeeper.connect - "localhost:2181" 
group.id - "test" 
zookeeper.session.timeout.ms - 500 
zookeeper.sync.time.ms - 250 
auto.commit.interval.ms - 1000 
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer 

和Java代碼:

Map<String, Integer> topicCount = new HashMap<>(); 
    topicCount.put("test", 1); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer 
      .createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 

缺什麼?

回答

0

許多事情可能正在發生。

首先,您的客戶的ZooKeeper會話超時時間非常短,這意味着由於垃圾收集暫停,消費者可能會遇到很多「軟故障」。發生這種情況時,消費者羣體將重新平衡,這可能會暫停消費。如果這種情況非常頻繁發生,消費者可能會進入一種不會消費信息的狀態,因爲它不斷地被重新平衡。我建議將ZooKeeper會話超時時間增加到30秒,以查看這是否可以解決問題。如果是這樣,您可以嘗試將其設置得更低。

其次,您是否可以確認新消息正在生成「測試」主題?您的消費者只會消費尚未提交的新消息。這個主題可能沒有任何新消息。

第三,您是否有同一消費羣體中的其他消費者可以處理消息?如果一個消費者經常發生軟故障,其他消費者將被分配其分區。

最後,您正在使用最終將被刪除的「舊」消費者。如果可能的話,我建議轉向卡夫卡0.9中提供的「新」消費者(KafkaConsumer.java)。雖然我不能保證這會解決你的問題。

希望這會有所幫助。