2017-04-11 46 views
0

我有一個DateFrame爲A,如:怎樣避免使用一起創造新的數據幀

+----+------+------+------+ 
    | key| val1 | val2 | date | 
    +----+------+---------- --+ 
    |k1 |v4 |v7 |d1 | 
    |k1 |v5 |v8 |d2 | 
    |k1 |v6 |v9 |d3 | 
    |k2 |v12 |v22 |d1 | 
    |k2 |v32 |v42 |d2 | 
    |k2 |v11 |v21 |d3 | 
    +----+------+------+------+ 

我想創建一個新的DateFrame爲B,如:

+----+------+------+------+------+------+------+ 
    | key| d1v1 |d1v2 |d2v1 | d2v2 |d3v1 |d3v2 | 
    +----+------+------+------+------+------+------+ 
    |k1 |v4 |v7 |v5 |v8 |v6 |v9 | 
    |k2 |v12 |v22 |v32 |v42 |v11 |v21 | 
    +----+------+------+------+------+------+------+ 

我的解決方案:

B = A.select('key').distinct() 
tmp_d1v1 = A.select('key','val1').where(F.col('date') == 'd1') 
tmp_d1v1 = tmp_d1v1.withColumnRenamed('val1' ,'d1v1')   
B = B.join(tmp_d1v1, 'key' , 'left_outer') 
tmp_d1v2 = A.select('key','val2').where(F.col('date') == 'd1') 
tmp_d1v2 = tmp_d1v2.withColumnRenamed('val2' ,'d1v2')   
B = B.join(tmp_d1v2, 'key' , 'left_outer') 

tmp_d2v1 = A.select('key','val1').where(F.col('date') == 'd2') 
tmp_d2v1 = tmp_d1v1.withColumnRenamed('val1' ,'d2v1')   
B = B.join(tmp_d2v1, 'key' , 'left_outer') 
tmp_d2v2 = A.select('key','val2').where(F.col('date') == 'd2') 
tmp_d2v2 = tmp_d2v2.withColumnRenamed('val2' ,'d2v2')   
B = B.join(tmp_d2v2, 'key' , 'left_outer') 

tmp_d3v1 = A.select('key','val1').where(F.col('date') == 'd3') 
tmp_d3v1 = tmp_d3v1.withColumnRenamed('val1' ,'d1v1')   
B = B.join(tmp_d3v1, 'key' , 'left_outer') 
tmp_d3v2 = A.select('key','val2').where(F.col('date') == 'd3') 
tmp_d3v2 = tmp_d3v2.withColumnRenamed('val2' ,'d1v2')   
B = B.join(tmp_d3v2, 'key' , 'left_outer') 

在我的項目中,列日期中的值的數量是7,像值的列的數量是24.所以,我的解決方案將運行連接7 * 24 = 168次。現在,我遇到了OOM的問題。我希望找到替代解決方案,無需加入操作或減少它。

+0

請您詳細說明爲什麼需要7 * 24(我理解7個連接,但不是「* 24」)。 –

+0

我更新了DataFrame A和B的結構,添加了一列作爲val2。你可以看看我們有2列作爲val1和val2,列數據上的值的數量是3.在我的解決方案中,連接需要運行2 * 3 = 6次。 –

回答

1

謝謝大家給一些提示,我還提到了其他answer。我是張貼我的回答,希望你欣賞它。

#first, I defined the Dataframe A: 
data = [('k1', 'v4', 'v7', 'd1'), 
    ('k1', 'v5', 'v8', 'd2'), 
    ('k1', 'v6', 'v9', 'd3'), 
    ('k2', 'v12', 'v22', 'd1'), 
    ('k2', 'v32', 'v42', 'd2'), 
    ('k2', 'v11', 'v21', 'd3')] 
A = spark.createDataFrame(data, ['key', 'val1', 'val2','date']) 
#second, using Pivot to define Dataframe B: 
from pyspark.sql.functions 
B = A.groupBy('key').pivot('date') \ 
      .agg(first('val1').alias('v1'),first('val2').alias('v2')) 
#result is: 
A.show() 
+---+----+----+----+ 
|key|val1|val2|date| 
+---+----+----+----+ 
| k1| v4| v7| d1| 
| k1| v5| v8| d2| 
| k1| v6| v9| d3| 
| k2| v12| v22| d1| 
| k2| v32| v42| d2| 
| k2| v11| v21| d3| 
+---+----+----+----+ 
B.show() 
+---+-----+-----+-----+-----+-----+-----+ 
|key|d1_v1|d1_v2|d2_v1|d2_v2|d3_v1|d3_v2| 
+---+-----+-----+-----+-----+-----+-----+ 
| k2| v12| v22| v32| v42| v11| v21| 
| k1| v4| v7| v5| v8| v6| v9| 
+---+-----+-----+-----+-----+-----+-----+ 
+0

這樣做更有意義。謝謝! –

0

我相信pivot是你所需要的