我可以在命令行上發送和接收消息以對抗Kafka位置安裝。我也可以通過Java代碼發送消息。這些消息顯示在Kafka命令提示符中。我也有一個卡夫卡用戶的Java代碼。該代碼昨天收到消息。但是,它今天早上沒有收到任何消息。代碼沒有改變。我想知道屬性配置是否不完全正確。下面是我的配置:卡夫卡消費者 - 接收消息不一致
生產者:
bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()
和ProducerRecord設置爲
ProducerRecord<String, String>("test", "mykey", "myvalue")
消費者:
zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer
和Java代碼:
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
缺什麼?