2017-09-14 95 views
1

我是Spark新手,需要從單行生成多行和多列。如何在pyspark中從單行添加多行和多列?

輸入:

col1 col2 col3 col4 

輸出

col1 col2 col3 col4 col5 col6 col7 

col1 col2 col3 col4 col8 col9 col10 

Logics for new columns: 

**col5 :** 

if col1==0 and col3!=0: 
    col5 = col4/col3 

else: 
    col5 = 0 


**col6 :** 

if col1==0 and col4!=0: 
    col6 = (col3*col4)/col1 

else: 
    col6 = 0 

For first row col7 holds same value as col2 

**col8 :** 

if col1!=0 and col3!=0: 
    col8 = col4/col3 

else: 
    col8 = 0 
**col9 :** 

if col1!=0 and col4!=0: 
    col9 = (col3*col4)/col1 

else: 
    col9 = 0 

For second row col10 = col2+ "_NEW" 

在結束 '和' 功能需要由與組施加。一旦我們轉換上述結構,希望這會很容易。

google中的大多數文章解釋瞭如何使用「withcolumn」選項而不是多列來將單列添加到現有數據框。文章沒有解釋這種情況。所以我想請求你的幫助。

回答

0

有幾個選項:

  1. 使用withColumn多次,因爲你需要(即你需要多少列添加)數據幀
  2. 使用map解析柱,以適當的列返回然後創建DataFrame。
2

希望這有助於!

from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show() 

輸出是:

+----+----+----+----+----+----+-----+ 
|col1|col2|col3|col4|col5|col6| col7| 
+----+----+----+----+----+----+-----+ 
| 1| 2| 3| 4| 0.0| 0.0| 2| 
| 5| 6| 7| 8| 0.0| 0.0| 6| 
| 1| 2| 3| 4|1.33|12.0|2_NEW| 
| 5| 6| 7| 8|1.14|11.2|6_NEW| 
+----+----+----+----+----+----+-----+ 


不要忘了讓我們知道是否能解決你的問題:)

+0

它完美地工作。謝謝 !如果可能的話,你能否給我建議一些學習材料,如書籍/論壇或在pyspark中熟悉的想法。 – user3150024

+0

很高興它幫助!最好的地方是參考['Programming Guides'section](http://spark.apache.org/docs/latest/index.html)。 (順便說一句 - 如果你喜歡這個答案,那麼你應該[把它投票/標記爲正確答案]) – Prem

+0

相同的查詢在pyspark shell中執行得更快,但CLI 。我的源數據以ORC格式存在於Hive中。任何建議請 – user3150024