2016-03-03 145 views
0

我是非常新的Spark Streaming,我正嘗試使用pyspark讀取和分析來自Kafka的JSON串流。讀取流是好的,我也可以打印()RDD。解析與pyspark json串流

{"Address":"22.79.52.79","AlarmProfile":"-1","Amps":"11.98","AmpsLimit":"90","AssetTag":"-1","AssetTag_1":"-1","Blank":"0","CAN":"0","Chain":"2","Config":"\u003cUnknown\u003e",...,"WattsLimit":"-1"} 

我想解析JSON,所以我可以使用,例如,my_parsed_json [「安培」]

但我不知道如何使用json.loads()他們。

我以這種方式運行該腳本:

/data/spark/bin/spark-submit --master spark://des01:7077 --total-executor-cores 2 --jars /data/dev/2.10/spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py pkb01:9092 topicname 

其中「pkb01:9092」是卡夫卡經紀人和「topicname」是卡夫卡的話題。

我的Python代碼:

from __future__ import print_function 

import sys 
import json 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

# sc es el Spark Context 

sc = SparkContext(appName="mitest") 
ssc = StreamingContext(sc, 2) 

brokers, topico = sys.argv[1:] 
kvs = KafkaUtils.createDirectStream(ssc, [topico], {"metadata.broker.list": brokers}) 

dstream = kvs.map(lambda x: x[1]) 

dstream.pprint() 

我想有這樣的:

my_parsed_json = dstream.map(lambda x: json.loads(x)) 

,但我從星火出現錯誤。任何幫助?

錯誤說:

Traceback (most recent call last): 
    File "/home/spark/test.py", line 28, in <module> 
    ssc.start() 
    File "/data/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 237, in start 
    File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
    py4j.protocol.Py4JJavaError: An error occurred while calling o21.start. 
    : java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
      at scala.Predef$.require(Predef.scala:233) 
      at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
      at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:551) 
      at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:609) 
      at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:608) 
      at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:606) 
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
      at py4j.Gateway.invoke(Gateway.java:259) 
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
      at py4j.commands.CallCommand.execute(CallCommand.java:79) 
      at py4j.GatewayConnection.run(GatewayConnection.java:207) 
      at java.lang.Thread.run(Thread.java:745) 
+1

請張貼錯誤。 – javadba

+0

謝謝javadba。我添加了錯誤,但我認爲必須是引用RDD對象的方式,而不是Dstreams。就像在Scala foreachRDD()中一樣。 – jcalbo

回答

2

您需要調用下面的操作

https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html

Transformation Meaning 
map(func) Return a new DStream by passing each element of the source DStream through a function func. 
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. 
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. 
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. 
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. 
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. 
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. 

之一..等

一個或一個以上的這些都需要在你的dstream上被調用。

+0

感謝您的精彩彙總選項。 – disruptive

2

爲什麼不這樣做的:

dstream = kvs.map(lambda x: json.loads(x[1])) 

dstream.pprint()