2017-07-14 120 views
3

我試圖從Kinesis處理Json字符串。 Json字符串可以有幾種不同的形式。從室壁運動,我創建了一個DSTREAM:Spark Streaming Scala結合不同結構的json形成一個DataFrame

val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com", 
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 

val lines = kinesisStream.map(x => new String(x)) 

lines.foreachRDD((rdd, time) =>{ 

    val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
    import sqlContext.implicits.StringToColumn 

    if(rdd.count() > 0){ 
    // Process jsons here 
    // Json strings here would have either one of the formats below 
    } 
}) 

的RDD串會有這些JSON字符串中的任何一個。 收藏:

[ 
    { 
    "data": { 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30024, 
     "TargetId": "4138", 
     "Timestamp": 0 
    }, 
    "host": "host1" 
    }, 
    { 
    "data": { 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30025, 
     "TargetId": "4139", 
     "Timestamp": 0 
    }, 
    "host": "host1" 
    } 
] 

和一些Json的字符串,如單對象,以便:

{ 
     "ApplicationVersion": "1.0.3 (65)", 
     "ProjectId": 30026, 
     "TargetId": "4140", 
     "Timestamp": 0 
} 

我希望能夠從「數據」提取對象的關鍵,如果它是第一種類型的JSON字符串並結合第二種類型的Json並形成一個RDD/DataFrame,我該如何實現這一點?

最後,我想我的數據幀是這樣的:

+------------------+---------+--------+---------+ 
|ApplicationVersion|ProjectId|TargetId|Timestamp| 
+------------------+---------+--------+---------+ 
|  1.0.3 (65)| 30024| 4138|  0| 
|  1.0.3 (65)| 30025| 4139|  0| 
|  1.0.3 (65)| 30026| 4140|  0| 
+------------------+---------+--------+---------+ 

對不起,新斯卡拉和火花。我一直在尋找現有的例子,但不幸的是沒有找到解決方案。

非常感謝提前。

回答

0

本例使用json4s

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

implicit val format = DefaultFormats 

case class jsonschema (ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int) 

val string1 = """ 
[ { 
    "data" : { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30024, 
    "TargetId" : "4138", 
    "Timestamp" : 0 
    }, 
    "host" : "host1" 
}, { 
    "data" : { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4139", 
    "Timestamp" : 0 
    }, 
    "host" : "host1" 
} ] 

""" 

val string2 = """ 
[ { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4140", 
    "Timestamp" : 0 
}, { 
    "ApplicationVersion" : "1.0.3 (65)", 
    "ProjectId" : 30025, 
    "TargetId" : "4141", 
    "Timestamp" : 0 
} ] 
""" 

val json1 = (parse(string1) \ "data").extract[List[jsonschema]] 

val json2 = parse(string2).extract[List[jsonschema]] 

val jsonRDD = json1.union(json2) 

val df = sqlContext.createDataFrame(jsonRDD) 

df.show 


+------------------+---------+--------+---------+ 
|ApplicationVersion|ProjectId|TargetId|Timestamp| 
+------------------+---------+--------+---------+ 
|  1.0.3 (65)| 30024| 4138|  0| 
|  1.0.3 (65)| 30025| 4139|  0| 
|  1.0.3 (65)| 30025| 4140|  0| 
|  1.0.3 (65)| 30025| 4141|  0| 
+------------------+---------+--------+---------+ 
+0

感謝您的快速響應!對不起,我忘了提及我正在使用Spark Streaming DStreams,我已經更新了我的問題。你的迴應仍然有幫助! – j3tr1

+0

如果你能夠從你的DStream中提取字符串,代碼應該或多或少的工作。 – philantrovert

+0

謝謝!這通過使用json4s指出了我的正確方向。這允許我在轉換爲DF之前先處理json字符串 – j3tr1

0

你可以從第一Dataframe選擇data.*列後用工會:

val spark = SparkSession.builder().master("local[*]").getOrCreate()  
val sc = spark.sparkContext 

// Assuming you store your jsons in two separate strings `json1` and `json2` 
val df1 = spark.read.json(sc.parallelize(Seq(json1))) 
val df2 = spark.read.json(sc.parallelize(Seq(json2))) 

import spark.implicits._ 
df1.select($"data.*") // Select only the data columns from first Dataframe 
    .union(df2)   // Union the two Dataframes as they have the same structure 
    .show() 

EDIT [其他解決方案鏈接]

後您編輯您的問題,我理解你需要某種形式的解析JSON文件時的回退機制。有更多的方法可以使用任何JSON解析庫來做到這一點 - 有一個很好的解決方案here與Play,我認爲它已經解釋瞭如何以優雅的方式解決這個問題。

一旦你有一個RDD[Data]其中數據是你的「變種」類型,你可以簡單地將它轉換成Dataframe使用rdd.toDF()

希望有所幫助。

+0

感謝的快速反應安德烈,我很欣賞它,是非常有用的!對不起,我忘了提及我正在使用Spark Streaming DStreams,我已經更新了上面的問題。 – j3tr1

+0

我明白了。這是一個簡單的方法來知道哪個對象來的時候? –

+0

不幸的是沒有 – j3tr1