我需要爲包含許多列的數據表生成row_numbers的完整列表。如何獲取Spark RDD的SQL row_number等效項?
在SQL中,這應該是這樣的:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
現在,讓我們在星火說,我有以下形式的RDD(K,V),其中V =(COL1,COL2,COL3)所以我的條目都喜歡
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
我想用正確的ROW_NUMBER
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
訂購這些使用命令,如sortBy(),sortWith(),sortByKey(),zipWithIndex等,並有一個新的RDD
(我不在乎括號,所以表格也可以是(K,(col1,col2,col3,rownum)))
我該怎麼做?
這是我第一次嘗試:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
還要注意的是,功能sortBy不能直接應用於RDD,但必須首先運行收集(),然後將輸出不是RDD,無論是,但數組
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
這裏有一個小更多的進步,但仍然不分區:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
這個問題的其他幾個部分回答問題的延伸,即http://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark,http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -by-group,http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E,http://stackoverflow.com/問題/ 270220 59/filter-rdd-based-on-row-number,http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd – 2014-11-20 22:03:13
I'米也想回答這個問題。 [Hive添加了分析函數(包括0.11中的'row_number()')(https://issues.apache.org/jira/browse/HIVE-896),並且Spark 1.1支持HiveQL/Hive 0.12。所以看起來'sqlContext.hql(「select row_number()over(partition by ...')應該可以,但我得到一個錯誤。 – dnlbrky 2014-11-23 03:52:44