2017-06-05 103 views
0

我想在pyspark中使用PrefixSpan序列挖掘。數據的,我需要的格式如下:pyspark將交易轉換爲列表

[[['a', 'b'], ['c']], [['a'], ['c', 'b'], ['a', 'b']], [['a', 'b'], ['e']], [['f']]] 

在最裏面的元素是productIds,則有訂單(含產品清單),然後有客戶(含訂單列表)。

我的數據有交易格式:

clientId orderId product 

其中訂單ID有獨立的產品和具有的clientId單獨的訂單多行多行。

的樣本數據:

test = sc.parallelize([[u'1', u'100', u'a'], 
[u'1', u'100', u'a'], 
[u'1', u'101', u'b'], 
[u'2', u'102', u'c'], 
[u'3', u'103', u'b'], 
[u'3', u'103', u'c'], 
[u'4', u'104', u'a'], 
[u'4', u'105', u'b']] 
) 


我的解決方案至今:
1.集團的產品訂單:

order_prod = test.map(lambda x: [x[1],([x[2]])]) 
order_prod = order_prod.reduceByKey(lambda a,b: a + b) 
order_prod.collect() 

導致:

[(u'102', [u'c']), 
(u'103', [u'b', u'c']), 
(u'100', [u'a', u'a']), 
(u'104', [u'a']), 
(u'101', [u'b']), 
(u'105', [u'b'])] 

2 。在客戶組的訂單:

client_order = test.map(lambda x: [x[0],[(x[1])]]) 
df_co = sqlContext.createDataFrame(client_order) 
df_co = df_co.distinct() 
client_order = df_co.rdd.map(list) 
client_order = client_order.reduceByKey(lambda a,b: a + b) 
client_order.collect() 

導致:

[(u'4', [u'105', u'104']), 
(u'3', [u'103']), 
(u'2', [u'102']), 
(u'1', [u'100', u'101'])] 

然後我想有這樣的名單:

[[[u'a', u'a'],[u'b']], [[u'c']], [[u'b', u'c']], [[u'a'],[u'b']]] 

回答

0

下面是使用PySpark數據框(溶液不我使用PySpark 2.1)。首先,您必須將RDD轉換爲Dataframe。

df = test.toDF(['clientId', 'orderId', 'product']) 

這是編組數據幀的代碼片段。基本思路是先將clientIdorderId分組,然後將聚合product列在一起。然後再由clientId再次分組。

import pyspark.sql.functions as func 

df_group = df.groupby(['clientId', 'orderId']).agg(func.collect_list('product').alias('product_list')) 
df_group_2 = df_group[['clientId', 'product_list']].\ 
    groupby('clientId').\ 
    agg(func.collect_list('product_list').alias('product_list_group')).\ 
    sort('clientId', ascending=True) 
df_group_2.rdd.map(lambda x: x.product_list_group).collect() # collect output here 

結果如下:

[[['a', 'a'], ['b']], [['c']], [['b', 'c']], [['b'], ['a']]]