2013-03-04 100 views
4

我一直在玩Apache Kafka幾天,這裏是我的問題, 如果我在「快速入門」部分中設置了本地測試在網站上,一切都很好,卡夫卡生產者/消費者,zookeeper服務器和kafka經紀人完美地工作。無法連接到卡夫卡生產商的遠程動物園管理員

現在,如果我在遠程服務器上運行(我們稱之爲節點2): - 動物園管理員 - 端口2181 - 卡夫卡經紀人 - 端口9092 - 卡夫卡消費者

然後,如果我從我的本地計算機上運行: - kafka生產者

假設node2上沒有防火牆。 連接結束超時。

以下是錯誤日誌:

/etc/java/jdk1.6.0_41/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/kevin/Documents/idea-IU-123.169/bin -Dfile.encoding=UTF-8 -classpath /etc/java/jdk1.6.0_41/lib/dt.jar:/etc/java/jdk1.6.0_41/lib/tools.jar:/etc/java/jdk1.6.0_41/lib/jconsole.jar:/etc/java/jdk1.6.0_41/lib/htmlconverter.jar:/etc/java/jdk1.6.0_41/lib/sa-jdi.jar:/home/kevin/Desktop/kafka-0.7.2/examples/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-compiler.jar:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-library.jar:/home/kevin/Desktop/kafka-0.7.2/core/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Documents/idea-IU-123.169/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain kafka.examples.KafkaConsumerProducerDemo 
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). 
log4j:WARN Please initialize the log4j system properly. 
Exception in thread "Thread-0" java.net.ConnectException: Connection timed out 
    at sun.nio.ch.Net.connect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) 
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) 
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:92) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:125) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) 
    at kafka.producer.ProducerPool.send(ProducerPool.scala:100) 
    at kafka.producer.Producer.zkSend(Producer.scala:137) 
    at kafka.producer.Producer.send(Producer.scala:99) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:103) 
    at kafka.examples.Producer.run(Producer.java:53) 

Process finished with exit code 0 

,這裏是我的製片代碼:

import java.util.Properties; 
import kafka.javaapi.producer.ProducerData; 
import kafka.producer.ProducerConfig; 


public class Producer extends Thread{ 

    private final kafka.javaapi.producer.Producer<String, String> producer; 
    private final String topic; 
    private final Properties props = new Properties(); 

    public Producer(String topic) 
    { 
    props.put("zk.connect", "node2:2181"); 
    props.put("connect.timeout.ms", "5000"); 
    props.put("socket.timeout.ms", "30000"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("producer.type", "sync"); 
    props.put("conpression.codec", "0"); 
    producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props)); 
    this.topic = topic; 
    } 

    public void run() { 
     String messageStr = new String("Message_test"); 
     producer.send(new ProducerData<String, String>(topic, messageStr)); 
    } 
} 

**所以我也測試通過

切換

props.put("zk.connect", "node2:2181"); 

props.put("broker.list", "0:node2:9082"); 

而且在這種情況下,我能夠成功連接。**

+0

你能解決這個問題嗎? – 2013-04-03 19:42:27

回答

3

見項目#3 http://kafka.apache.org/faq.html

的解決方法是明確設置主機屬性卡夫卡

的server.properties

您可以驗證這一點通過使用Zookeeper。如果您使用的是kafka 0.7 *,請打開ZkCli控制檯並獲取/ brokers/ids/0,並且您應該獲得所有經紀人元數據。確保IP地址/這裏的主機名匹配的ZK連接字符串使用的是在生產代碼 -

props.put("zk.connect", "node2:2181"); 

以我爲例,我是用我的本地機器連接到Ubuntu的虛擬機上運行的生產商(同一個盒子,不同的IP),並且此解決方法有所幫助。

相關問題