2017-07-18 70 views
0

我正在使用卡夫卡製作者10.2.1創建主題並寫入主題,當我創建主題時我得到以下錯誤,但主題被創建:在創建主題時獲取卡夫卡製作者中的錯誤錯誤,但在卡夫卡服務器上創建了主題

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:774) 
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:494) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360) 
    at kafka.AvroProducer.produce(AvroProducer.java:47) 
    at samples.TestMqttSource.messageReceived(TestMqttSource.java:89) 
    at mqtt.JsonConsumer.messageArrived(JsonConsumer.java:132) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:477) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:380) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:184) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
msg org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
loc org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
cause org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
excep java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 

所有的建議,非常感謝。

+0

生產者不能創建主題。管理客戶端API可以做到這一點。創建主題是因爲在代理上啓用了主題自動創建(auto.create.topics.enable屬性)(默認情況下)。你能顯示代碼嗎? – ppatierno

+0

感謝您的評論。我找到了一個解決方案,我試圖在下面的答案的評論中解釋「問題」。 – Margit

回答

0

不能使用KafkaProducer來創​​建一個話題(所以我不太清楚你如何設法創建主題,除非你通過不同的方法,這樣做是以前,如卡夫卡管理shell腳本)。而是使用Kafka庫提供的AdminUtils。

我最近取得了這兩個要求,你會很驚訝它是多麼容易實現。以下是一個簡單的代碼示例,向您展示如何通過AdminUtils創建主題,以及如何寫入主題。

class Foo { 

    private String TOPIC = "testingTopic"; 
    private int NUM_OF_PARTITIONS = 10; 
    private int REPLICATION_FACTOR = 1; 

    public Foo() { 


     ZkClient zkClient = new ZkClient("localhost:2181", 15000, 10000, ZKStringSerializer$.MODULE$); 
     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("localhost:2181"), false); 

     if (!AdminUtils.topicExists(zkUtils, TOPIC)) { 
      try { 
       AdminUtils.createTopic(zkUtils, TOPIC, NUM_OF_PARTITIONS, REPLICATION_FACTOR, new Properties(), Enforced$.MODULE$); 

       Properties producerConfig = new Properties(); 

       producerConfig.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost:9092"); 
       producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
       producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 

       KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); 

       // This is just to show you how to write but you could be more elaborate 
       int i = 0; 

       while (i < 11) { 
        ProducerRecord<String, String> rec = new ProducerRecord<>(TOPIC, ("This is line number " + i)); 
        producer.send(rec); 
        i++; 
       } 

       producer.closer(); 
      } catch (AdminOperationException aoe) { 
       aoe.printStackTrace(); 
      } 
     } 

    } 

} 

請記住,如果您想刪除主題,默認情況下在設置中禁用。啓動Kafka時使用的配置文件(默認情況下爲$ {kafka_home} /config/server.properties),如果它尚不存在並且設置爲false或註釋掉,則添加以下行:

delete.topic.enabled=true 

然後您必須重新啓動服務器,並且可以通過Java或提供的命令行工具刪除主題。

NB

它總是一個好主意,關閉生產者/消費者,當你與他們完成了,如圖所示的代碼示例。

+0

感謝您的答案和評論。我們的卡夫卡設置爲從消息中自動生成主題。我發現了什麼問題,我在Windows PC上運行客戶端,並且在連接到Kafka時使用了Kafka服務器的IP地址,但我認爲在創建主題的元數據中, kafkaserver名稱被返回,因此我將Kafka服務器的IP地址和主機名添加到我的主機文件中,並且它可以正常工作。 – Margit

+0

啊,沒關係,發生這種事情有點不尋常!但很高興這一切工作 –