2017-07-29 101 views
0

我需要幫助在pyspark。我正在從kafka流式傳輸json數據,我需要在pyspark中將其轉換爲Dataframe。爲了流,我使用了下面的代碼。火花流在pyspark json文件中的數據幀

from __future__ import print_function 
import sys 
import csv 
import json 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark.sql import SparkSession 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
import pandas as pd 
global gspark 
def convert_Json2DF(time,rdd): 
    nf = gspark.read.json(rdd) 
    nf.toDF().show() 
    # Convert RDD[String] to RDD[Row] to DataFrame 
    #rowRdd = rdd.map(lambda w: Row(word=w)) 
    #wordsDataFrame = gspark.createDataFrame(rowRdd) 
    #pdf = wordsDataFrame.toDF() 
    #pdf.show() 
if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) 
     exit(-1) 
    gspark = SparkSession \ 
     .builder \ 
     .appName("SparkSteaming Kafka Receiver") \ 
     .config("spark.some.config.option", "some-value") \ 
     .config("spark.ui.port", 22300) \ 
     .config("spark.executor.instances", 4) \ 
     .config("spark.executor.cores", 4) \ 
     .getOrCreate() 
    sc = gspark.sparkContext 
    SQLContext= SQLContext(sc) 
    ssc = StreamingContext(sc, 15) 
    zkQuorum, topic = sys.argv[1:] 
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) 
    lines = kvs.map(lambda (key,value): json.loads(value)) 
    lines.pprint() 
    lines.foreachRDD(Json2DF) 
ssc.start() 
ssc.awaitTermination() 

對於上述代碼,我無法將json數據轉換爲數據框。任何人都可以糾正我在哪裏我需要做的變化,在Json2DF功能或主要功能。

感謝 巴拉

回答

1

所有的拳,確保所有JSON DATAS具有相同的架構。

def check_json(js, col): 
    try: 
     data = json.loads(js) 
     return [data.get(i) for i in col] 
    except: 
     return [] 


def convert_json2df(rdd, col): 
    ss = SparkSession(rdd.context) 
    if rdd.isEmpty(): 
     return 
    df = ss.createDataFrame(rdd, schema=StructType("based on 'col'")) 
    df.show() 


cols = ['idx', 'name'] 

sc = SparkContext() 
ssc = StreamingContext(sc, 5) 

lines = ssc.socketTextStream('localhost', 9999) \ 
    .map(lambda x: check_json(x, cols)) \ 
    .filter(lambda x: x) \ 
    .foreachRDD(lambda x: convert_json2df(x, cols)) 

ssc.start() 
ssc.awaitTermination()