2017-09-27 497 views
0

我使用jupyter筆記本與熊貓,但是當我使用Spark時,我想使用Spark DataFrame轉換或計算而不是熊貓。請幫助我將一些計算轉換爲Spark DataFrame或RDD。Spark DataFrame運算符(nunique,乘法)

數據幀:

df = 
+--------+-------+---------+--------+ 
| userId | item | price | value | 
+--------+-------+---------+--------+ 
| 169 | I0111 | 5300 | 1 | 
| 169 | I0973 | 70  | 1 | 
| 336 | C0174 | 455 | 1 | 
| 336 | I0025 | 126 | 1 | 
| 336 | I0973 | 4  | 1 | 
| 770963 | B0166 | 2  | 1 | 
| 1294537| I0110 | 90  | 1 | 
+--------+-------+---------+--------+ 

1.使用熊貓計算:

(1) userItem = df.groupby(['userId'])['item'].nunique() 

和結果是一系列對象:

+--------+------+ 
| userId |  | 
+--------+------+ 
| 169 | 2 | 
| 336 | 3 | 
| 770963 | 1 | 
| 1294537| 1 | 
+--------+------+ 

2.使用乘法

data_sum = df.groupby(['userId', 'item'])['value'].sum() --> result is Series object 

average_played = np.mean(userItem) --> result is number 

(2) weighted_games_played = data_sum * (average_played/userItem) 

使用星火據幀和Opertors在星火辦請幫助我,這(1)和(2)

回答

1

可以實現(1)使用類似以下內容:

import pyspark.sql.functions as f 
userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item')) 

和(2):

data_sum=df.groupby(['userId','item']).agg(f.sum('value').alias('sum_value')) 

average_played=userItem.agg(f.mean('n_item').alias('avg_played')) 

data_sum=data_sum.join(userItem, on='userId').crossJoin(average_played) 

data_sum=data_sum.withColumn("weighted_games_played", f.expr("sum_value*avg_played/n_item")) 
+0

我的意思是在大熊貓系列對象之間的乘法,但與火花我不能 ( ** weighted_games_played = data_sum *(average_played/userItem)** ) –

+0

gotcha,我會修改答案 – ags29

+0

嗯,它的工作。 –