2017-01-02 95 views
1

我需要從使用Scala和Spark的遠程Kafka隊列主題中消費消息。默認情況下,遠程計算機上的Kafka端口設置爲7072,而不是9092。此外,遠程機器上有安裝以下版本:無法將代理列表參數從Scala傳遞到Kafka:屬性bootstrap.servers無效

  1. 卡夫卡0.10.1.0
  2. 斯卡拉2.11

這意味着我應該通過經紀人列表(與該端口7072)從斯卡拉遠程卡夫卡,因爲否則它會嘗試使用默認端口。

問題是根據日誌,參數bootstrap.servers無法被遠程機器識別。我還嘗試將此參數重命名爲metadata.broker.list,broker.listlisteners,但始終在日誌中出現相同的錯誤Property bootstrap.servers is not valid,然後默認使用端口9092(並且顯然不會消耗這些消息)。

在POM文件我用卡夫卡以下依賴和星火:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

所以,我用Scala的2.10,而不是2.11。

這是我的Scala代碼(它工作絕對沒問題,如果我使用安裝在亞馬遜雲我自己卡夫卡在那裏我有EMR機(那裏我已經用了卡夫卡的港口9092)):

val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap 

    val kafkaParams = Map[String, String](
     "broker.list" -> "XXX.XX.XXX.XX:7072", 
     "zookeeper.connect" -> "XXX.XX.XXX.XX:2181", 
     "group.id" -> "test", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "auto.offset.reset" -> "smallest") 

    val testEvents: DStream[String] = 
     KafkaUtils 
     .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     testTopicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
    ).map(_._2) 

我正在閱讀this Documentation,但看起來我所做的一切都是正確的。我應該使用其他一些Kafka客戶端API(其他Maven依賴項)嗎?

更新#1:

我也試過直接流(無動物園管理員),但它運行我進了錯誤:

val testTopicMap = testTopic.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072","bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", 
             "auto.offset.reset" -> "smallest") 
val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2) 

testEvents.print() 

17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 

更新#2:

我發現了這個相關的話題。建議的解決方案說Fixed it by setting the property 'advertised.host.name' as instructed by the comments in the kafka configuration (config/server.properties)。我是否正確理解config/server.properties應該在安裝有Kafka的遠程機器上更改

Kafka : How to connect kafka-console-consumer to fetch remote broker topic content?

回答

0

我覺得我碰到了同樣的問題,最近(EOFException類),原因是卡夫卡的版本不匹配。

如果我看這裏https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.6.2 kafka streaming版本的編譯時間依賴性是0.8而您使用0.10。

就我所知0.9已經不兼容0.8。你可以嘗試設置一個本地的0.8或0.9代理並嘗試連接?

+0

爲了進行測試,我在本地安裝了Kafka 0.10.1.0-2.11,然後在config/server中更改了參數'listeners'中的端口。屬性「到」7072「,最後我用問題中提到的POM執行我的代碼。我能夠沒有任何問題地獲得消息。所以,我放棄了版本的兼容性問題。爲了確保我能夠很好地解釋我自己:我在AWS EMR羣集上運行消費者代碼,而Kafka羣集是AWS之外的另一臺機器。 – Dinosaurius

+0

我在想這與'security.protocol'有關。例如,如果遠程Kafka集羣使用'SSL'協議,那麼顯然我的連接將失敗。在這種情況下,我唯一會誤解的是爲什麼我能夠使用'curl'和Kafka-Rest-API(http://docs.confluent.io/2.0.0/kafka-rest/docs/api)來檢索郵件。 HTML#消費者)從終端(沒有斯卡拉)。也許它與遠程集羣中Confluent API的設置有關(即它不使用SSL)。 – Dinosaurius