2017-06-06 77 views
1

我有以下結構快行間計算

groupId | time | value 
1   0  2 
1   1  1 
1   2  4 
2   0  6 
2   1  2 

的CSV文件(> 3GB),並希望增加一列(值t-1),包含值 - 在同一組中 - 的「行」早時間步驟的:

groupId | time | value | value t-1 
1   0  2   - 
1   1  1   2 
1   2  4   1 
2   0  6   - 
2   1  2   6 

我想象的昂貴部分是要搜索的前一行。不知何故,它似​​乎是一個沒有減少地圖減少的工作 - 如果這是有道理的。但據我所知,我不能確定同一工作人員擁有同一組的所有數據。

Spark是否是正確的工具?

我的最佳替代的解決方案是分割文件分成多個文件(每組一個),只是運行排序並依次Python腳本的多個實例增加了值t-1值

+0

你要做到這幾個文件?如果你只需要修復一個文件,那麼做一個簡單的for循環,然後等待(可能很多)來解析你的3GB數據 – JBernardo

+0

@JBernardo現在我只需要做一次,但是有可能會變成這樣更常用的用例。這個循環是在1到2天的球場中的某個地方 - 只是看到只有一個繁忙的核心而感到傷心...... – bam

回答

2

這可以是通過使用如下所示的火花窗口功能來實現。

import org.apache.spark.sql.expressions.Window 

val df = Seq((1,0,2), (1,1,1), (1,2,4), (2,0,6), (2,1,2)).toDF("groupId", "time", "value") 

val result = df.withColumn("value_t-1", sum($"value").over(Window.partitionBy("groupId").orderBy("time").rowsBetween(-1,-1))) 

輸出:

scala> result.show() 
+-------+----+-----+---------+ 
|groupId|time|value|value_t-1| 
+-------+----+-----+---------+ 
|  1| 0| 2|  null| 
|  1| 1| 1|  2| 
|  1| 2| 4|  1| 
|  2| 0| 6|  null| 
|  2| 1| 2|  6| 
+-------+----+-----+---------+ 

Python版本

>>> from pyspark.sql.window import Window 
>>> import pyspark.sql.functions as func 
>>> df = spark.createDataFrame([(1,0,2), (1,1,1), (1,2,4), (2,0,6), (2,1,2)], ["groupId", "time", "value"]) 
>>> result = df.withColumn("value_t-1", func.sum(df.value).over(Window.partitionBy(df.groupId).orderBy(df.time).rowsBetween(-1,-1))) 
>>> result.show() 
+-------+----+-----+---------+ 
|groupId|time|value|value_t-1| 
+-------+----+-----+---------+ 
|  1| 0| 2|  null| 
|  1| 1| 1|  2| 
|  1| 2| 4|  1| 
|  2| 0| 6|  null| 
|  2| 1| 2|  6| 
+-------+----+-----+---------+ 
+0

謝謝!在一臺機器上(8個內核)在不到25分鐘的時間內處理完整個文件(9個這樣的列) – bam