2017-02-27 77 views
2

我在AWS上的DC/OS(Mesos)羣集上安裝了Kafka。啓用三個代理並創建了一個名爲「topic1」的主題。爲什麼使用Client API for Java時,消費者在使用來自Kafka的DC/OS消息時掛起?

dcos kafka topic create topic1 --partitions 3 --replication 3 

然後我寫了一個Producer類來發送消息和一個Consumer類來接收它們。

public class Producer { 
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException { 
     Map<String, Object> producerConfig = new HashMap<>(); 
     System.out.println("setting Producerconfig."); 
     producerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 

     ByteArraySerializer serializer = new ByteArraySerializer(); 
     System.out.println("Creating KafkaProcuder"); 
     KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer); 
     for (int i = 0; i < 100; i++) { 
      String msgstr = msg + i; 
      byte[] message = msgstr.getBytes(); 
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message); 
      System.out.println("Sent:" + msgstr); 
      kafkaProducer.send(record); 
     } 
     kafkaProducer.close(); 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     sendMessage("Kafka test message 2/27 3:32"); 
    } 

} 

public class Consumer { 
    public static String getMessage() { 
     Map<String, Object> consumerConfig = new HashMap<>(); 
     consumerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 
     consumerConfig.put("group.id", "dj-group"); 
     consumerConfig.put("enable.auto.commit", "true"); 
     consumerConfig.put("auto.offset.reset", "earliest"); 
     ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); 
     KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer); 

     kafkaConsumer.subscribe(Arrays.asList("topic1")); 
     while (true) { 
      ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100); 
      System.out.println(records.count() + " of records received."); 
      for (ConsumerRecord<byte[], byte[]> record : records) { 
       System.out.println(Arrays.toString(record.value())); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     getMessage(); 
    } 
} 

首先,我在集羣上運行Producer將消息發送到topic1。但是,當我運行Consumer時,它無法接收任何內容,只是掛起。

Producer工作,因爲我可以通過運行與卡夫卡附帶的shell腳本來獲取所有的信息安裝

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning 

但我爲什麼可以接收不Consumer?這post建議與舊的偏移量group.id可能是一個可能的原因。我只在消費者而不是生產者中創建group.id。我如何配置這個組的偏移量?

+0

爲了確保group.id不是問題,請使用'kafkaConsumer.seekToBeginning()' –

+0

@ MatthiasJ.Sax我應該刪除'consumerConfig.put(「auto.offset.reset」,「earliest」); 「那麼?在訂閱後,我應該在哪裏添加此行?添加後仍然沒有得到任何東西。 – ddd

+0

嘗試調用輪詢時使用更長的超時,並且在服務器/客戶端的日誌中是否存在任何異常? – amethystic

回答

1

事實證明,kafkaConsumer.subscribe(Arrays.asList("topic1"));導致poll()掛起。根據Kafka Consumer does not receive messages ,有兩種連接主題的方式,assignsubscribe。用下面的代碼替換subscribe後,它開始工作。

TopicPartition tp = new TopicPartition("topic1", 0); 
    List<TopicPartition> tps = Arrays.asList(tp); 
    kafkaConsumer.assign(tps); 

但是,輸出顯示不是預期的數字數組(生產者發送字符串)。但我想這是一個單獨的問題。

+1

所謂的「單獨問題」是你正在接收字節(因爲Kafka在引擎蓋下處理字節)。您應該使用解串器,例如'key.deserializer = org.apache.kafka.common.serialization.StringDeserializer'爲鍵值和一個單獨的值。請參閱http://kafka.apache.org/documentation/(但我無法找到SerDe的確切頁面)。 –

+1

@JacekLaskowski感謝您的解釋。爲我保存了另一篇文章 – ddd

相關問題