2014-11-20 132 views
17

我需要爲包含許多列的數據表生成row_numbers的完整列表。如何獲取Spark RDD的SQL row_number等效項?

在SQL中,這應該是這樣的:

select 
    key_value, 
    col1, 
    col2, 
    col3, 
    row_number() over (partition by key_value order by col1, col2 desc, col3) 
from 
    temp 
; 

現在,讓我們在星火說,我有以下形式的RDD(K,V),其中V =(COL1,COL2,COL3)所以我的條目都喜歡

(key1, (1,2,3)) 
(key1, (1,4,7)) 
(key1, (2,2,3)) 
(key2, (5,5,5)) 
(key2, (5,5,9)) 
(key2, (7,5,5)) 
etc. 

我想用正確的ROW_NUMBER

(key1, (1,2,3), 2) 
(key1, (1,4,7), 1) 
(key1, (2,2,3), 3) 
(key2, (5,5,5), 1) 
(key2, (5,5,9), 2) 
(key2, (7,5,5), 3) 
etc. 
訂購這些使用命令,如sortBy(),sortWith(),sortByKey(),zipWithIndex等,並有一個新的RDD

(我不在乎括號,所以表格也可以是(K,(col1,col2,col3,rownum)))

我該怎麼做?

這是我第一次嘗試:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) 

val temp1 = sc.parallelize(sample_data) 

temp1.collect().foreach(println) 

// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 
// ((1,2),1,2,3) 
// ((1,2),1,4,7) 
// ((1,2),2,2,3) 

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) 

// ((((1,2),1,2,3),1),0) 
// ((((1,2),1,4,7),1),1) 
// ((((1,2),2,2,3),1),2) 
// ((((3,4),5,5,5),1),3) 
// ((((3,4),5,5,9),1),4) 
// ((((3,4),7,5,5),1),5) 

// note that this isn't ordering with a partition on key value K! 

val temp2 = temp1.??? 

還要注意的是,功能sortBy不能直接應用於RDD,但必須首先運行收集(),然後將輸出不是RDD,無論是,但數組

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) 

// ((1,2),1,4,7) 
// ((1,2),1,2,3) 
// ((1,2),2,2,3) 
// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 

這裏有一個小更多的進步,但仍然不分區:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) 

temp2.collect().foreach(println) 

// ((1,2),1,4,7,1) 
// ((1,2),1,2,3,2) 
// ((1,2),2,2,3,3) 
// ((3,4),5,5,5,4) 
// ((3,4),5,5,9,5) 
// ((3,4),7,5,5,6) 
+0

這個問題的其他幾個部分回答問題的延伸,即http://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark,http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -by-group,http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E,http://stackoverflow.com/問題/ 270220 59/filter-rdd-based-on-row-number,http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd – 2014-11-20 22:03:13

+0

I'米也想回答這個問題。 [Hive添加了分析函數(包括0.11中的'row_number()')(https://issues.apache.org/jira/browse/HIVE-896),並且Spark 1.1支持HiveQL/Hive 0.12。所以看起來'sqlContext.hql(「select row_number()over(partition by ...')應該可以,但我得到一個錯誤。 – dnlbrky 2014-11-23 03:52:44

回答

13

row_number() over (partition by ... order by ...)功能已添加到Spark 1.4。這個答案使用PySpark/DataFrames。

創建測試數據框:

from pyspark.sql import Row, functions as F 

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)), 
    Row(k="key1", v=(1,4,7)), 
    Row(k="key1", v=(2,2,3)), 
    Row(k="key2", v=(5,5,5)), 
    Row(k="key2", v=(5,5,9)), 
    Row(k="key2", v=(7,5,5)) 
    ) 
).toDF() 

添加分區的行數:

from pyspark.sql.window import Window 

(testDF 
.select("k", "v", 
     F.rowNumber() 
     .over(Window 
       .partitionBy("k") 
       .orderBy("k") 
      ) 
     .alias("rowNum") 
     ) 
.show() 
) 

+----+-------+------+ 
| k|  v|rowNum| 
+----+-------+------+ 
|key1|[1,2,3]|  1| 
|key1|[1,4,7]|  2| 
|key1|[2,2,3]|  3| 
|key2|[5,5,5]|  1| 
|key2|[5,5,9]|  2| 
|key2|[7,5,5]|  3| 
+----+-------+------+ 
4

這是一個有趣的問題,你正在提出。我會用Python來回答它,但我相信你可以無縫地翻譯到Scala。

這裏是我會怎麼對付它:

1-簡化您的數據:現在

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3]))) 

TEMP2是一個 「真正」 的鍵值對。這看起來:

[ 
((3, 4), (5, 5, 5)), 
((3, 4), (5, 5, 9)), 
((3, 4), (7, 5, 5)), 
((1, 2), (1, 2, 3)), 
((1, 2), (1, 4, 7)), 
((1, 2), (2, 2, 3)) 

]

2-然後,使用基團的按功能來再現的分區的效果BY:現在

temp3 = temp2.groupByKey() 

TEMP3是具有2 RDD行:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), 
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)] 

3-現在,您需要爲RDD的每個值應用排名函數。在Python中,我會使用簡單的排序功能(枚舉將創建ROW_NUMBER列):

temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10) 

注意,要實現你的特定的順序,你將需要養活右「鍵」的說法(在python,我只想創造一個lambda函數像:

lambda tuple : (tuple[0],-tuple[1],tuple[2]) 

末(沒有密鑰參數的功能,它看起來像):

[ 
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2)) 

]

希望有所幫助!

祝你好運。