2017-08-08 72 views
0

我有數據流傳輸到我的火花階應用這種格式如何獲取火花流數據框中列的滯後?

id mark1 mark2 mark3 time 
uuid1 100 200 300 Tue Aug 8 14:06:02 PDT 2017 
uuid1 100 200 300 Tue Aug 8 14:06:22 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:32 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:52 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:58 PDT 2017 

我把它讀入列ID,MARK-1,MARK2,MARK3和時間。時間也轉換爲日期時間格式。 我想要得到這個由id分組,並獲得mark1的滯後,它給出了前一行的mark1值。 事情是這樣的:

id mark1 mark2 mark3 prev_mark time 
uuid1 100 200 300 null  Tue Aug 8 14:06:02 PDT 2017 
uuid1 100 200 300 100  Tue Aug 8 14:06:22 PDT 2017 
uuid2 150 250 350 null  Tue Aug 8 14:06:32 PDT 2017 
uuid2 150 250 350 150  Tue Aug 8 14:06:52 PDT 2017 
uuid2 150 250 350 150  Tue Aug 8 14:06:58 PDT 2017 

考慮數據幀是markDF。我曾嘗試:

val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

它說非時間窗不能流/數據集追加/幀應用。

我也曾嘗試:

val window = Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

爲了得到一個窗口,它也不能工作幾行。流式傳輸窗口是這樣的: window("timestamp", "10 minutes") 不能用於發送延遲。我對如何做到這一點感到困惑。任何幫助都是極好的!!

+0

做每批或全部流數據「滯後」? –

回答

0

我勸你改變time列到String作爲

+-----+-----+-----+-----+----------------------------+ 
|id |mark1|mark2|mark3|time      | 
+-----+-----+-----+-----+----------------------------+ 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017| 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017| 
+-----+-----+-----+-----+----------------------------+ 

root 
|-- id: string (nullable = true) 
|-- mark1: integer (nullable = false) 
|-- mark2: integer (nullable = false) 
|-- mark3: integer (nullable = false) 
|-- time: string (nullable = true) 

之後執行以下操作應該工作

df.withColumn("prev_mark", lag("mark1", 1).over(Window.partitionBy("id").orderBy("time"))) 

,這將給你輸出

+-----+-----+-----+-----+----------------------------+---------+ 
|id |mark1|mark2|mark3|time      |prev_mark| 
+-----+-----+-----+-----+----------------------------+---------+ 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017|null  | 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017|100  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017|null  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017|150  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017|150  | 
+-----+-----+-----+-----+----------------------------+---------+ 
+0

這不會工作,因爲我的數據幀是一個流式數據幀 – PiKaY

+0

你爲什麼這麼認爲? –

+0

over函數不是非基於時間的窗口 – PiKaY