2017-06-21 49 views
1

我有2個dataframes在火花(pyspark)火花更新數據幀與在條件蟒

 
DF_A 
col1 col2 col3 
a  1  100 
b  2  300 
c  3  500 
d  4  700 


DF_B 
col1 col3 
a  150 
b  350 
c  0 
d  650 

我想無論本與值DF_B.col3更新DF A的列。

目前我做

df_new = df_a.join(df_b, df_a.col1 == df_b.col1,'inner') 

而且它給我COL1 X 2次,並在df_new COL3 X 2倍。 現在我不得不放棄不相關的單元格來顯示0.什麼是更好的方法來做到這一點?不使用udfs。

+0

什麼是你需要的最終結果。我不太清楚我的理解。 – eliasah

+0

它更像這樣的sql語句:UPDATE table_a A,table_b B SET A.col3 = B.col3 WHERE A.col1 = B.col1;在數據框上。如果不存在於B中,則爲0 – Viv

回答

1

如果我理解你的問題正確,您試圖進行以下操作:

UPDATE表-A A,表-B B設置A.col3 = B.col3 WHERE A.col1 = B.col1;在數據框上。如果不存在於B中,則0。 (參看註釋)

a = [("a",1,100),("b",2,300),("c",3,500),("d",4,700)] 
b = [("a",150),("b",350),("d",650)] 
df_a = spark.createDataFrame(a,["col1","col2","col3"]) 
df_b = spark.createDataFrame(b,["col1","col3"]) 

df_a.show() 
# +----+----+----+ 
# |col1|col2|col3| 
# +----+----+----+ 
# | a| 1| 100| 
# | b| 2| 300| 
# | c| 3| 500| 
# | d| 4| 700| 
# +----+----+----+ 

df_b.show() # I have removed an entry for the purpose of the demo. 
# +----+----+ 
# |col1|col3| 
# +----+----+ 
# | a| 150| 
# | b| 350| 
# | d| 650| 
# +----+----+ 

你需要執行outer join後跟一個​​3210:

from pyspark.sql import functions as F 

df_a.withColumnRenamed('col3','col3_a') \ 
    .join(df_b.withColumnRenamed('col3','col3_b'), on='col1', how='outer') \ 
    .withColumn("col3", F.coalesce('col3_b', F.lit(0))) \ 
    .drop(*['col3_a','col3_b']).show() 
# +----+----+----+ 
# |col1|col2|col3| 
# +----+----+----+ 
# | d| 4| 650| 
# | c| 3| 0| 
# | b| 2| 350| 
# | a| 1| 150| 
# +----+----+----+