2016-11-17 62 views
0

我想一個Spark SQL數據幀這樣的變換:星火SQL數據框中改造涉及分區和滯後

animal value 
------------ 
cat  8 
cat  5 
cat  6 
dog  2 
dog  4 
dog  3 
rat  7 
rat  4 
rat  9 

到這樣一個數據幀:

animal value previous-value 
----------------------------- 
cat  8    0 
cat  5    8 
cat  6    5 
dog  2    0 
dog  4    2 
dog  3    4 
rat  7    0 
rat  4    7 
rat  9    4 

我有點想animal分區,然後對於每個animalprevious-value滯後value(缺省值爲0),然後再將分區重新組合在一起。

+0

我沒有時間去嘗試,但現在1)不依賴於數據幀排序,添加一個明確的'index'列和2)嘗試通過'animal''repartition'ing,然後使用'mapPartitions'來做你的行抵消。它可能不會很漂亮 –

回答

2

這可以使用window function來完成。

import org.apache.spark.sql.expressions.Window 
import sqlContext.implicits._ 

val df = sc.parallelize(Seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).toDF("animal", "value", "time") 

df.show 
+------+-----+-----+ 
|animal|value| time| 
+------+-----+-----+ 
| cat| 8|01:00| 
| cat| 5|02:00| 
| cat| 6|03:00| 
| dog| 2|02:00| 
| dog| 4|04:00| 
| dog| 3|06:00| 
| rat| 7|01:00| 
| rat| 4|03:00| 
| rat| 9|05:00| 
+------+-----+-----+ 

我已經添加了一個「時間」字段來說明orderBy。

val w1 = Window.partitionBy($"animal").orderBy($"time") 

val previous_value = lag($"value", 1).over(w1) 
val df1 = df.withColumn("previous", previous_value) 

df1.show 
+------+-----+-----+--------+             
|animal|value| time|previous| 
+------+-----+-----+--------+ 
| dog| 2|02:00| null| 
| dog| 4|04:00|  2| 
| dog| 3|06:00|  4| 
| cat| 8|01:00| null| 
| cat| 5|02:00|  8| 
| cat| 6|03:00|  5| 
| rat| 7|01:00| null| 
| rat| 4|03:00|  7| 
| rat| 9|05:00|  4| 
+------+-----+-----+--------+ 

如果你想用0替換空值:

val df2 = df1.na.fill(0) 
df2.show 
+------+-----+-----+--------+ 
|animal|value| time|previous| 
+------+-----+-----+--------+ 
| dog| 2|02:00|  0| 
| dog| 4|04:00|  2| 
| dog| 3|06:00|  4| 
| cat| 8|01:00|  0| 
| cat| 5|02:00|  8| 
| cat| 6|03:00|  5| 
| rat| 7|01:00|  0| 
| rat| 4|03:00|  7| 
| rat| 9|05:00|  4| 
+------+-----+-----+--------+ 
+0

我會小心按'String'排序,我很確定spark會按字典順序對它們進行排序(所以'''12''會比''2''少一些需要的話)。 –

+0

'scala>「12」<「2」 res0:布爾=真' –

+1

好點,@ evan058。在現實生活中,我會使用時間戳。 – sjstanley

1

這個代碼peice的將工作:

val df = spark.read.format("CSV").option("header","true").load("/home/shivansh/Desktop/foo.csv") 
val df2 = df.groupBy("animal").agg(collect_list("value") as "listValue") 
val desiredDF = df2.rdd.flatMap{row=> 
     val animal=row.getAs[String]("animal") 
     val valueList=row.getAs[Seq[String]]("listValue").toList 
     val newlist=valueList zip "0"::valueList 
     newlist.map(a=>(animal,a._1,a._2)) 
    }.toDF("animal","value","previousValue") 

在星火殼:

scala> val df=spark.read.format("CSV").option("header","true").load("/home/shivansh/Desktop/foo.csv") 
df: org.apache.spark.sql.DataFrame = [animal: string, value: string] 

scala> df.show() 
+------+-----+ 
|animal|value| 
+------+-----+ 
| cat| 8| 
| cat| 5| 
| cat| 6| 
| dog| 2| 
| dog| 4| 
| dog| 3| 
| rat| 7| 
| rat| 4 | 
| rat| 9| 
+------+-----+ 


scala> val df2=df.groupBy("animal").agg(collect_list("value") as "listValue") 
df2: org.apache.spark.sql.DataFrame = [animal: string, listValue: array<string>] 

scala> df2.show() 
+------+----------+ 
|animal| listValue| 
+------+----------+ 
| rat|[7, 4 , 9]| 
| dog| [2, 4, 3]| 
| cat| [8, 5, 6]| 
+------+----------+ 


scala> val desiredDF=df2.rdd.flatMap{row=> 
    | val animal=row.getAs[String]("animal") 
    | val valueList=row.getAs[Seq[String]]("listValue").toList 
    | val newlist=valueList zip "0"::valueList 
    | newlist.map(a=>(animal,a._1,a._2)) 
    | }.toDF("animal","value","previousValue") 
desiredDF: org.apache.spark.sql.DataFrame = [animal: string, value: string ... 1 more field] 

scala> desiredDF.show() 
+------+-----+-------------+              
|animal|value|previousValue| 
+------+-----+-------------+ 
| rat| 7|   0| 
| rat| 4 |   7| 
| rat| 9|   4 | 
| dog| 2|   0| 
| dog| 4|   2| 
| dog| 3|   4| 
| cat| 8|   0| 
| cat| 5|   8| 
| cat| 6|   5| 
+------+-----+-------------+