2016-05-31 196 views
0

我在使用python將spark結構的RDD轉換爲spark中的數據框時遇到困難。將變化的元組的RDD轉換爲Spark中的DataFrame

df1=[['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),(itm22,6)]] 

轉換後,我的數據幀應該如下所示:

 usr1 usr2 
itm1 2.0 NaN 
itm2 NaN 3.0 
itm22 NaN 6.0 
itm3 3.0 5.0 

我最初想coverting上述RDD結構如下:

df1={'usr1': {'itm1': 2, 'itm3': 3}, 'usr2': {'itm2': 3, 'itm3': 5, 'itm22':6}} 

然後使用Python的大熊貓模塊pand=pd.DataFrame(dat2),然後使用spark_df = context.createDataFrame(pand)將pandas數據幀轉換回火花數據幀。但是,我相信,通過這樣做,我將RDD轉換爲非RDD對象,然後轉換回RDD,這是不正確的。有些人可以幫我解決這個問題嗎?

+0

這是怎麼回事,不包括列選擇,[從你以前的問題](http://stackoverflow.com/q/37514344/1560062)? – zero323

+0

請注意,在我之前的問題中,我更關心處理同一用戶的重複「itms」(請參閱​​「如果在上面的元組中有多個計數字段,即('itm1',3)如何合併(或添加)這個值3到列聯表(或實體 - 項目矩陣)的最終結果中。「由於給出的答案仍然不清楚(至少從我的角度來看),如果我能夠爲此得到一個解決方案問題,我可以關閉對我以前的問題的答案。 – Rkz

回答

2

有了這樣的數據:

rdd = sc.parallelize([ 
    ['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),('itm22',6)] 
]) 

拼合記錄:

def to_record(kvs): 
    user, *vs = kvs # For Python 2.x use standard indexing/splicing 
    for item, value in vs: 
     yield user, item, value 

records = rdd.flatMap(to_record) 

轉換到DataFrame

df = records.toDF(["user", "item", "value"]) 

支點:

result = df.groupBy("item").pivot("user").sum() 

result.show() 
## +-----+----+----+ 
## | item|usr1|usr2| 
## +-----+----+----+ 
## | itm1| 2|null| 
## | itm2|null| 3| 
## | itm3| 3| 5| 
## |itm22|null| 6| 
## +-----+----+----+ 

備註:Spark DataFrames旨在處理長時間和相對較薄的數據。如果要生成廣泛的應急表,DataFrames將不會有用,特別是在數據密集且您希望爲每個功能保留單獨列的情況下。

+0

完美的非常感謝!並且感謝那些額外的信息。 – Rkz

相關問題