2

我已經開始從Spark引擎學習Spark流,並且很新的數據分析和火花。我只是想創建一個小IOT應用程序,我想要預測未來的數據。Java Spark Streaming JSON解析

我有TIVA硬件,它發送實時傳感器JSON數據如下,

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}] 

在此T是其中數據被張貼UNIX時間戳。 傳感器是一組傳感器,每個傳感器('s')的數據都是'd'。

我想要做的就是消費這些數據並創建對象,然後使用Spark的Mlib(機器學習)或等效庫來預測未來數據。

我希望有一個總體思路,這是否將有可能與所有的技術選擇

  1. 我已經決定使用?
  2. 如何使用嵌套的JSON?我嘗試使用SQLContext但沒有成功。
  3. 一般準則,以實現我在這裏要做的。

這是我用來使用來自KAFKA的消息的代碼。 PS:我想在Java中這樣做,以保持線性和良好的性能。

+0

你能後的代碼,你嘗試過什麼到目前爲止?它可以使用Spark SQL和Streaming。 – Shankar

+0

發佈代碼有問題。 –

+0

當您嘗試'sqlContext'來讀取json字符串時,您遇到了什麼問題?該任務不是可序列化的問題? – Shankar

回答

2

回答您的問題:

1)這是否將有可能與所有的技術選擇,我決定使用?

`Ans: Yes it can be done and quiet a normal use-case for spark.` 

2)我該如何使用嵌套的JSON?我嘗試使用SQLContext但沒有成功。

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.` 

3)一般準則,以實現我在這裏要做的。

Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.

如果你想使用其他機器學習算法或第三方庫,也許你應該考慮的模型創建作爲批處理作業emmiting出在最後的模型。流式作業應該加載模型並獲取數據流並僅預測。

+0

你能指導我正確使用這種用例嗎?這將是非常有幫助 –

4

由於您使用SPARK 2.0,從SparkSession,就可以讀取JSON

json.foreachRDD(rdd -> { 

     DataFrame df= spark.read.json(rdd) 
     //process json with this DF. 
} 

也可以將RDD轉換成排的RDD,那麼你可以使用createDataFrame方法。

json.foreachRDD(rdd -> { 

      DataFrame df= spark.createDataFrame(rdd); 
      //process json with this DF. 
    } 

嵌套JSON處理可能從DF,您可以按照this文章。

而且,一旦您將您的JSON來DF,您可以在任何火花模塊使用它(像火花SQL,ML)

+0

在我的情況下,我嘗試使用的SQLContext構造函數已被棄用。我沒有得到如何使用'JavaSparkContext'獲得'sc'(SparkContext) –

+0

@RahulBorkar:您可以將'JavaSparkContext'傳遞給SQLContext(javasparkContext) – Shankar

+0

它在Spark 2.11中不贊成使用。另外,在嘗試代碼時,我得到「方法轉換(函數,JavaRDD >)對於JavaDStream類型不明確」 –