2

我是新來pyspark如何分組數據存儲到JSON在pyspark

我有一個數據集,它看起來像(只有幾列的快照)

data description

我希望將我的數據按鍵。我的關鍵是

CONCAT(a.div_nbr,a.cust_nbr) 

我的最終目標是將數據轉換成JSON,格式化像轉換該

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],.... 

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
     { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ], 

1384611034793 [{},{},{}] ....

我創建了一個數據框(我將兩個表基本上得到一些更多的領域)

joinstmt = sqlContext.sql(
      "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as 
        key,a.prod_nbr , a.prod_desc,a.prod_brnd ,  a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

FROM scoop_dtl上(a.precima_id = b.precima_id)「)的聯接scoop_hdr B現在

,爲了得到上述結果,我需要GROUP BY結果基於關鍵,我做了以下

groupbydf = joinstmt.groupBy("key") 

這導致INTP分組的數據和閱讀我知道,我不能直接使用它,我需要將其轉換回dataframes存儲之後。

我是新來的,需要一些幫助,以序轉換回dataframes或是否有任何其他的方式,以及我將不勝感激。

回答

2

不能直接使用GroupedData。它必須首先彙總。它可以通過使用collect_list等內置函數進行彙總來部分覆蓋,但使用DataFrameWriter無法實現所需的輸出,並且使用代表密鑰的值。

在可以嘗試這樣的事情,而不是:

from pyspark.sql import Row 
import json 

def make_json(kvs): 
    k, vs = kvs 
    return json.dumps({k[0]: list(vs)}) 

(df.select(struct(*keys), values) 
    .rdd 
    .mapValues(Row.asDict) 
    .groupByKey() 
    .map(make_json)) 

saveAsTextFile

+0

問題澄清:什麼變量「KVS」,「*鍵」和「值」對應於OP的例子? – Quetzalcoatl

3

如果您加入了數據幀是這樣的:

gender age 
M 5 
F 50 
M 10 
M 10 
F 10 

然後,您可以使用下面的代碼,從而獲得所需的輸出

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \ 
    .write.json("jsonOutput.txt") 

輸出看起來象下面這樣:

{"gender":"F","ages":[50,10]} 
{"gender":"M","ages":[5,10,10]} 

在如果您有多個欄目,如姓名,薪水。您可以添加列象下面這樣:

df.groupBy("gender") 
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names")) 

你的輸出看起來像:

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]} 
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]} 
+0

謝謝 - 根據op的問題,我們如何將解決方案擴展到更多領域的數據?例如。如果joinedDF包含[{'gender':'M','name':'kelly','age':20},{'gender':M,'name':'bob','age':41}] ,然後按'性別'分組,我們實現了:{'gender':'M','names':['kelly','bob'],'ages':[20,41]} – Quetzalcoatl

+0

更新了我的答案。希望有所幫助。 –