2016-07-26 98 views
0

我想從我的本地機器連接到卡夫卡:弗林克,卡夫卡和動物園管理員與URI

kafkaParams.setProperty("bootstrap.servers", Defaults.BROKER_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.BROKER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 

完全正常的,但我的BROKER_URI定義如下my-server.com:1234/my/subdirectory

我發現這種現象被稱爲chroot路徑。

它引發以下錯誤:Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: my-server.com:1234/my/subdirectory

如何解決這個問題?

這是我的依賴關係:

val flinkVersion = "1.0.3" 

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion, 

回答

0

只是儘量不帶路徑上下文和斜線host:port格式。如果你有一個以上的服務器,這將是一個列表host1:port1,host2:port2

參考:http://kafka.apache.org/documentation.html

+0

這提供了以下錯誤:'異常線程「main」 org.apache.kafka.common.errors.TimeoutException:超時過期而獲取主題元數據' –

+0

這表明配置的格式沒問題。接下來要看的是如果在kafka實例上運行iptables或防火牆。你可以在你的客戶端中telnet kafka實例嗎? –

+0

有趣的是,我可以使用Kafka控制檯消費者進行連接:'./kafka-console-consumer.sh --zookeeper my-server.com:1234/my/subdirectory --topic my-topic --from-beginning'工作得很好。和Telnet也工作正常:'telnet my-server.com 1234' –

0

bootstrap.servers應該是一個逗號分隔的列表如下所示:address1:port1,address2:port2,...,addressn:portn。如果您只有一個Kafka經紀人,您應該輸入類似localhost:9092(除非您將Kafka配置爲在另一個端口上運行)。

你可以參考this post from dataArtisans瞭解更多關於如何讓Flink和Kafka一起工作的細節。

0

笨。 Zookeeper!=卡夫卡。正如你在代碼中看到的那樣,我使用了兩次相同的URL,但事實證明它們應該是不同的。

我想從我的本地機器連接到卡夫卡:

kafkaParams.setProperty("bootstrap.servers", Defaults.KAFKA_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.ZOOKEEPER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 
+1

;)你會請發佈正確的代碼? –

+0

Ofcourse :-),它被添加 –