2015-08-21 81 views
5

我想比較兩個連續的行ii-1col2(按col1排序)。如何比較多行?

如果i個行item_iitem_[i-1]_row是不同的,我想用1

+--------------+ 
| col1 col2 | 
+--------------+ 
| row_1 item_1 | 
| row_2 item_1 | 
| row_3 item_2 | 
| row_4 item_1 | 
| row_5 item_2 | 
| row_6 item_1 | 
+--------------+ 

遞增的item_[i-1]計數在上面的例子,如果我們在掃描兩行時間向下,我們看到row_2row_3是不同的,因此我們爲item_1添加一個。接下來,我們看到row_3row_4不同,然後將其添加到item_2。繼續,直到我們結束:

+-------------+ 
| col2 col3 | 
+-------------+ 
| item_1 2 | 
| item_2 2 | 
+-------------+ 

回答

8

您可以使用窗口函數和聚合的組合來做到這一點。窗口函數用於獲取下一個值col2(使用col1進行排序)。然後彙總計算我們遇到差異的時間。這在下面的代碼中實現:

val data = Seq(
    ("row_1", "item_1"), 
    ("row_2", "item_1"), 
    ("row_3", "item_2"), 
    ("row_4", "item_1"), 
    ("row_5", "item_2"), 
    ("row_6", "item_1")).toDF("col1", "col2") 

import org.apache.spark.sql.expressions.Window 
val q = data. 
    withColumn("col2_next", 
    coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")). 
    groupBy($"col2"). 
    agg(sum($"col2" =!= $"col2_next" cast "int") as "col3") 

scala> q.show 
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 
+------+----+ 
| col2|col3| 
+------+----+ 
|item_1| 2| 
|item_2| 2| 
+------+----+