2017-04-18 81 views
0
val df1 = sc.parallelize(Seq(
    ("a1",10,"ACTIVE","ds1"), 
    ("a1",20,"ACTIVE","ds1"), 
    ("a2",50,"ACTIVE","ds1"), 
    ("a3",60,"ACTIVE","ds1")) 
).toDF("c1","c2","c3","c4")` 

val df2 = sc.parallelize(Seq(
    ("a1",10,"ACTIVE","ds2"), 
    ("a1",20,"ACTIVE","ds2"), 
    ("a1",30,"ACTIVE","ds2"), 
    ("a1",40,"ACTIVE","ds2"), 
    ("a4",20,"ACTIVE","ds2")) 
).toDF("c1","c2","c3","c5")` 


df1.show() 

// +---+---+------+---+ 
// | c1| c2| c3| c4| 
// +---+---+------+---+ 
// | a1| 10|ACTIVE|ds1| 
// | a1| 20|ACTIVE|ds1| 
// | a2| 50|ACTIVE|ds1| 
// | a3| 60|ACTIVE|ds1| 
// +---+---+------+---+ 

df2.show() 
// +---+---+------+---+ 
// | c1| c2| c3| c5| 
// +---+---+------+---+ 
// | a1| 10|ACTIVE|ds2| 
// | a1| 20|ACTIVE|ds2| 
// | a1| 30|ACTIVE|ds2| 
// | a1| 40|ACTIVE|ds2| 
// | a4| 20|ACTIVE|ds2| 
// +---+---+------+---+ 

我的要求是:我需要連接兩個數據幀。 我的輸出數據幀應該包含來自df1的所有記錄以及來自df2的所有記錄,這些記錄不僅僅適用於匹配「c1」的df1。我從df2中提取的記錄應該在列「c3」處更新爲「非活動」。如何連接兩個DataFrame並更改缺少值的列?

在這個例子中,只有「c1」的匹配值是a1。所以我需要從df2中取出c2 = 30和40條記錄,並使它們成爲非活動狀態。

這裏是輸出。

df_output.show() 

// +---+---+--------+---+ 
// | c1| c2| c3 | c4| 
// +---+---+--------+---+ 
// | a1| 10|ACTIVE |ds1| 
// | a1| 20|ACTIVE |ds1| 
// | a2| 50|ACTIVE |ds1| 
// | a3| 60|ACTIVE |ds1| 
// | a1| 30|INACTIVE|ds1| 
// | a1| 40|INACTIVE|ds1| 
// +---+---+--------+---+ 

任何人都可以幫助我做到這一點。

+0

對於INACTIVE記錄,c4值是否從ds2更改爲ds1? – Pushkr

回答

1

首先,一件小事。我在df2使用的列不同的名稱:

val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4") 

沒什麼大不了的,但這種做事情容易,我推理。

現在的有趣的東西。我將是一個有點冗長,爲清楚起見:

val join = df1 
.join(df2, df1("c1") === df2("d1"), "inner") 
.select($"d1", $"d2", $"d3", lit("ds1").as("d4")) 
.dropDuplicates 

我在這裏如下:

  • 內的c1d1df1df2之間加入
  • 選擇df2列和簡單的「硬編碼」ds1在最後一列進行替換ds2
  • 刪除重複項

這基本上只是過濾掉一切df2df1c1一個對應的按鍵。

下一頁I DIFF:

val diff = join 
.except(df1) 
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4") 

這是在joindf1耳目一新了一套基本的操作。這些是要停用的項目,所以我選擇所有列,但用硬編碼的INACTIVE值替換第三列。

剩下要做的事情就是把它們放在一起:

df1.union(diff) 

這只是結合df1與我們前面計算能產生最終結果停用值表:再次

+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds1| 
| a1| 40|INACTIVE|ds1| 
+---+---+--------+---+ 

而且,你不需要所有這些中間值。我只是很詳細地幫助追蹤整個過程。

+0

'val c1Ids = df1.select(「c1」)。as [String] .collect()' 'val joinDf = df1.as(「t1」)。join(df2.as(「t2」),df1 「c1」)=== df2(「c1」),「rightouter」)。select($「t2.c1」,$「t2.c2」)。distinct()。withColumn(「c3」,lit(「INACTIVE 「))。withColumn(」c4「,lit(」ds1「))。filter(not($」c1「)。isin(c1Ids:_ *))' – Ramesh

+0

'val finalDf = df1.unionAll(joinDf)'This應該給出輸出。 – Ramesh

+0

我確信有多種方法可能會比我的更好,以獲得該輸出。很高興爲您指出正確的方向。祝你的項目好運! – Vidya

0

這裏是骯髒的解決方案 -

from pyspark.sql import functions as F 


# find the rows from df2 that have matching key c1 in df2 
df3 = df1.join(df2,df1.c1==df2.c1)\ 
.select(df2.c1,df2.c2,df2.c3,df2.c5.alias('c4'))\ 
.dropDuplicates() 

df3.show() 

+---+---+------+---+ 
| c1| c2| c3| c4| 
+---+---+------+---+ 
| a1| 10|ACTIVE|ds2| 
| a1| 20|ACTIVE|ds2| 
| a1| 30|ACTIVE|ds2| 
| a1| 40|ACTIVE|ds2| 
+---+---+------+---+ 

# Union df3 with df1 and change columns c3 and c4 if c4 value is 'ds2' 

df1.union(df3).dropDuplicates(['c1','c2'])\ 
.select('c1','c2',\ 
     F.when(df1.c4=='ds2','INACTIVE').otherwise('ACTIVE').alias('c3'), 
     F.when(df1.c4=='ds2','ds1').otherwise('ds1').alias('c4') 
     )\ 
.orderBy('c1','c2')\ 
.show() 

+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds1| 
| a1| 40|INACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
+---+---+--------+---+ 
0

喜歡這個挑戰,這裏是我的解決方案。

val c1keys = df1.select("c1").distinct 
val df2_in_df1 = df2.join(c1keys, Seq("c1"), "inner") 
val df2inactive = df2_in_df1.join(df1, Seq("c1", "c2"), "leftanti").withColumn("c3", lit("INACTIVE")) 
scala> df1.union(df2inactive).show 
+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds2| 
| a1| 40|INACTIVE|ds2| 
+---+---+--------+---+ 
相關問題