2017-03-06 63 views
0

我有用戶活動數據在購物平臺的RDD在pyspark爲:找到獨特的元組

USER_ID | product_id | 事件(查看產品,購買,加入購物車等)

事情是可以有多個事件類型相同(user_id,product_id)元組。我想收集同一行中的所有這些事件。

例子:

╔═════════════════════════════════════════════════╗ 
║ user_id | product_id    | Event ║ 
╠═════════════════════════════════════════════════╣ 
║ 1    1      viewed ║ 
║ 1    1      purchased ║ 
║ 2    1      added  ║ 
║ 2    2      viewed ║ 
║ 2    2      added  ║ 
╚═════════════════════════════════════════════════╝ 

我想:

╔════════════════════════════════════════════════╗ 
║ user_id | product_id |  Event    ║ 
╠════════════════════════════════════════════════╣ 
║ 1   1   {viewed, purchased}  ║ 
║ 2   1   {added}     ║ 
║ 2   2   {viewed, added}   ║ 
╚════════════════════════════════════════════════╝ 
+0

你看着使用內置'map'和'groupByKey'功能? – jtmingus

回答

0

在Scala中它應該是這樣的:

val grouped : RDD[((user_id, product_id), Iterable[Event])]= rdd.map(triplet => ((triplet._1, triplet._2), triplet._3)).groupByKey() 
0

如果你需要嘗試Dataframe的看看這個: -

import pyspark.sql.functions as F 
rdd = sc.parallelize([[1, 1, 'viewed'],[1, 1, 'purchased'],[2, 1, 'added'],[2, 2, 'viewed'],[2, 2, 'added']]) 
df = rdd.toDF(['user_id', 'product_id', 'Event']) 
df.groupby(['user_id', 'product_id']).agg(F.collect_set("Event")).show() 

如果喜歡遵循rdd這個外觀: -

rdd = sc.parallelize([[1, 1, 'viewed'],[1, 1, 'purchased'],[2, 1, 'added'],[2, 2, 'viewed'],[2, 2, 'added']]) 
rdd.groupBy(lambda x:(x[0],x[1])).map(lambda x:(x[0][0], x[0][1], map(lambda x:x[2], list(x[1])))).collect()