我在AWS上的DC/OS(Mesos)羣集上安裝了Kafka。啓用三個代理並創建了一個名爲「topic1」的主題。爲什麼使用Client API for Java時,消費者在使用來自Kafka的DC/OS消息時掛起?
dcos kafka topic create topic1 --partitions 3 --replication 3
然後我寫了一個Producer類來發送消息和一個Consumer類來接收它們。
public class Producer {
public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
Map<String, Object> producerConfig = new HashMap<>();
System.out.println("setting Producerconfig.");
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
ByteArraySerializer serializer = new ByteArraySerializer();
System.out.println("Creating KafkaProcuder");
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
for (int i = 0; i < 100; i++) {
String msgstr = msg + i;
byte[] message = msgstr.getBytes();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
System.out.println("Sent:" + msgstr);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
sendMessage("Kafka test message 2/27 3:32");
}
}
public class Consumer {
public static String getMessage() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
consumerConfig.put("group.id", "dj-group");
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
kafkaConsumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
System.out.println(records.count() + " of records received.");
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(Arrays.toString(record.value()));
}
}
}
public static void main(String[] args) {
getMessage();
}
}
首先,我在集羣上運行Producer
將消息發送到topic1
。但是,當我運行Consumer
時,它無法接收任何內容,只是掛起。
Producer
工作,因爲我可以通過運行與卡夫卡附帶的shell腳本來獲取所有的信息安裝
./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
但我爲什麼可以接收不Consumer
?這post建議與舊的偏移量group.id可能是一個可能的原因。我只在消費者而不是生產者中創建group.id。我如何配置這個組的偏移量?
爲了確保group.id不是問題,請使用'kafkaConsumer.seekToBeginning()' –
@ MatthiasJ.Sax我應該刪除'consumerConfig.put(「auto.offset.reset」,「earliest」); 「那麼?在訂閱後,我應該在哪裏添加此行?添加後仍然沒有得到任何東西。 – ddd
嘗試調用輪詢時使用更長的超時,並且在服務器/客戶端的日誌中是否存在任何異常? – amethystic