我是非常新的Spark Streaming,我正嘗試使用pyspark讀取和分析來自Kafka的JSON串流。讀取流是好的,我也可以打印()RDD。解析與pyspark json串流
{"Address":"22.79.52.79","AlarmProfile":"-1","Amps":"11.98","AmpsLimit":"90","AssetTag":"-1","AssetTag_1":"-1","Blank":"0","CAN":"0","Chain":"2","Config":"\u003cUnknown\u003e",...,"WattsLimit":"-1"}
我想解析JSON,所以我可以使用,例如,my_parsed_json [「安培」]
但我不知道如何使用json.loads()他們。
我以這種方式運行該腳本:
/data/spark/bin/spark-submit --master spark://des01:7077 --total-executor-cores 2 --jars /data/dev/2.10/spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py pkb01:9092 topicname
其中「pkb01:9092」是卡夫卡經紀人和「topicname」是卡夫卡的話題。
我的Python代碼:
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# sc es el Spark Context
sc = SparkContext(appName="mitest")
ssc = StreamingContext(sc, 2)
brokers, topico = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topico], {"metadata.broker.list": brokers})
dstream = kvs.map(lambda x: x[1])
dstream.pprint()
我想有這樣的:
my_parsed_json = dstream.map(lambda x: json.loads(x))
,但我從星火出現錯誤。任何幫助?
錯誤說:
Traceback (most recent call last):
File "/home/spark/test.py", line 28, in <module>
ssc.start()
File "/data/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 237, in start
File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o21.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:551)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:609)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:608)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
請張貼錯誤。 – javadba
謝謝javadba。我添加了錯誤,但我認爲必須是引用RDD對象的方式,而不是Dstreams。就像在Scala foreachRDD()中一樣。 – jcalbo