1
我想用Apache Kafka和Spark流設置一個流應用程序。 Kafka運行在一個單獨的unix機器版本0.9.0.1上,spark v1.6.1是hadoop集羣的一部分。卡夫卡與火花集成
我已經啓動了zookeeper和kafka服務器,並且想要使用控制檯生產者從日誌文件中傳輸消息並使用直接方法(無接收器)使用Spark應用程序使用該消息。我已經用Python寫的代碼,並使用下面的命令執行:
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar streamingDirectKafka.py
得到如下錯誤:
/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o38.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
能否請你幫忙嗎?
謝謝!
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
conf = SparkConf().setAppName("StreamingDirectKafka")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
topic = ['test']
kafkaParams = {"metadata.broker.list": "apsrd7102:9092"}
lines = (KafkaUtils.createDirectStream(ssc, topic, kafkaParams)
.map(lambda x: x[1]))
counts = (lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b))
counts.pprint()
ssc.start()
ssc.awaitTermination()
ü意味着我應該使用卡夫卡版本0.8.4星火版本1.6.1呢? 此外,當我運行字數例如,在Scala中,我得到不同的錯誤: 異常在線程「主要」 java.lang.ClassCastException:kafka.cluster.BrokerEndPoint不能轉換到kafka.cluster.Broker 命令: bin/run-example streaming.DirectKafkaWordCount apsrd7102:9092 test Thanks !! –
即使使用kafka版本0.8.2.1,我也遇到同樣的錯誤? 你可以請幫忙,如果可能是我使用不正確的jar的依賴? –