2017-03-17 96 views
0

我有一個火花數據幀是這樣的:如何扁平pySpark數據框?

id | Operation | Value | 
-------------------------- 
1 | Date_Min | 148590 | 
1 | Date_Max | 148590 | 
1 | Device | iphone | 
2 | Date_Min | 148590 | 
2 | Date_Max | 148590 | 
2 | Review | Good | 
3 | Date_Min | 148590 | 
3 | Date_Max | 148590 | 
3 | Review | Bad | 
3 | Review | samsung| 

我使用的Spark 2.1.0與pyspark:

id |   Operation     |  Value 
----------------------------------------------------------- 
1 | [Date_Min, Date_Max, Device]   | [148590, 148590, iphone]  
2 | [Date_Min, Date_Max, Review]   | [148590, 148590, Good]  
3 | [Date_Min, Date_Max, Review, Device] | [148590, 148590, Bad,samsung]  

,我期待resul。我試過這個solution,但它只適用於一列。

感謝

+0

我仍然無法找出好辦法做到這一點特定任務。我試圖分開展開列'df1 = df.select('id',explode(col(「Operation」)))', 'df2 = df.select('id',explode(col(「Value」)) )'。但是,如何將兩個數據框水平地堆疊在一起沒有很好的解決方案。 – titipata

回答

0

下面是一個例子數據幀從上面。我使用這個solution爲了解決你的問題。

df = spark.createDataFrame(
    [[1, ['Date_Min', 'Date_Max', 'Device'], ['148590', '148590', 'iphone']], 
     [2, ['Date_Min', 'Date_Max', 'Review'], ['148590', '148590', 'Good']],  
     [3, ['Date_Min', 'Date_Max', 'Review', 'Device'], ['148590', '148590', 'Bad', 'samsung']]], 
    schema=['id', 'l1', 'l2']) 

在這裏,您可以定義udf先爲每行壓縮兩個列表。

from pyspark.sql.types import * 
from pyspark.sql.functions import col, udf, explode 

zip_list = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     StructField("first", StringType()), 
     StructField("second", StringType()) 
    ])) 
) 

最後,您可以將兩列壓縮在一起,然後展開該列。

df_out = df.withColumn("tmp", zip_list('l1', 'l2')).\ 
    withColumn("tmp", explode("tmp")).\ 
    select('id', col('tmp.first').alias('Operation'), col('tmp.second').alias('Value')) 
df_out.show() 

輸出

+---+---------+-------+ 
| id|Operation| Value| 
+---+---------+-------+ 
| 1| Date_Min| 148590| 
| 1| Date_Max| 148590| 
| 1| Device| iphone| 
| 2| Date_Min| 148590| 
| 2| Date_Max| 148590| 
| 2| Review| Good| 
| 3| Date_Min| 148590| 
| 3| Date_Max| 148590| 
| 3| Review| Bad| 
| 3| Device|samsung| 
+---+---------+-------+ 
+0

謝謝!它確實很好。 – Omar14

+0

沒問題@ Omar14! – titipata

+0

最後,我仍然有一個函數zip_list的問題。當我使用Zeppelin筆記本時,它可以工作,但是當我嘗試使用spark-submit自動執行作業和腳本時,作業失敗,出現此錯誤: ''zip argument#1 must support iteration' – Omar14

-1

如果使用數據幀,然後試試這個: -

import pyspark.sql.functions as F 

your_df.select("id", F.explode("Operation"), F.explode("Value")).show() 
+0

當我在同一時間爆炸2列時,它不起作用。 – titipata