2016-08-24 152 views
1

我想用Apache Kafka和Spark流設置一個流應用程序。 Kafka運行在一個單獨的unix機器版本0.9.0.1上,spark v1.6.1是hadoop集羣的一部分。卡夫卡與火花集成

我已經啓動了zookeeper和kafka服務器,並且想要使用控制檯生產者從日誌文件中傳輸消息並使用直接方法(無接收器)使用Spark應用程序使用該消息。我已經用Python寫的代碼,並使用下面的命令執行:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar streamingDirectKafka.py 

得到如下錯誤:

/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream 
py4j.protocol.Py4JJavaError: An error occurred while calling o38.createDirectStreamWithoutMessageHandler. 
: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 

能否請你幫忙嗎?

謝謝!

from pyspark import SparkContext, SparkConf 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

if __name__ == "__main__": 
    conf = SparkConf().setAppName("StreamingDirectKafka") 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1) 

    topic = ['test'] 
    kafkaParams = {"metadata.broker.list": "apsrd7102:9092"} 
    lines = (KafkaUtils.createDirectStream(ssc, topic, kafkaParams) 
         .map(lambda x: x[1])) 
    counts = (lines.flatMap(lambda line: line.split(" ")) 
        .map(lambda word: (word, 1)) 
        .reduceByKey(lambda a, b: a+b)) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

回答

0

看起來你使用的是不兼容的卡夫卡版本。從Spark 2.0開始的文檔 - 支持Kafka 0.8.x。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

+0

ü意味着我應該使用卡夫卡版本0.8.4星火版本1.6.1呢? 此外,當我運行字數例如,在Scala中,我得到不同的錯誤: 異常在線程「主要」 java.lang.ClassCastException:kafka.cluster.BrokerEndPoint不能轉換到kafka.cluster.Broker 命令: bin/run-example streaming.DirectKafkaWordCount apsrd7102:9092 test Thanks !! –

+0

即使使用kafka版本0.8.2.1,我也遇到同樣的錯誤? 你可以請幫忙,如果可能是我使用不正確的jar的依賴? –