2017-09-14 75 views
0
**DF1** **DF2**   **output_DF** 
120 D  A    120 null A 
120 E  B    120 null B 
125 F  C    120 null C 
      D    120 D D 
      E    120 E E 
      F    120 null F 
      G    120 null G 
      H    120 null H 
          125 null A 
          125 null B 
          125 null C 
          125 null D 
          125 null E 
          125 F F 
          125 null G 
          125 null H 

從數據幀1和數據幀2中需要獲取spark-shell中的最終輸出數據幀。 其中A,B,C,D,E,F採用日期格式(yyyy-MM-dd)& 120,125是有成千上萬個ticket_id的ticket_id列。 我剛剛在這裏提取了一個。如何使用scala獲得此信息

+0

您也可以查看'df.join()'函數和可能''df.na.fill()'。 – Shaido

+0

請您詳細說明一下... !! – maduri

+0

[加入密鑰上的Spark數據幀]的可能重複(https://stackoverflow.com/questions/40343625/joining-spark-dataframes-on-the-key) – Harald

回答

0

爲了讓你可以使用df.join()df.na.fill()(如在評論中提到)預期的結果,就像這樣:

火花2.0+

val resultDF = df1.select("col1").distinct.collect.map(_.getInt(0)).map(id => df1.filter(s"col1 = $id").join(df2, df1("col2") === df2("value"), "right").na.fill(id)).reduce(_ union _) 

火花1.6

val resultDF = df1.select("col1").distinct.collect.map(_.getInt(0)).map(id => df1.filter(s"col1 = $id").join(df2, df1("col2") === df2("value"), "right").na.fill(id)).reduce(_ unionAll _) 

它會給你以下結果 -

+---+----+-----+ 
|120|null| A| 
|120|null| B| 
|120|null| C| 
|120| D| D| 
|120| E| E| 
|120|null| F| 
|120|null| G| 
|120|null| H| 
|125|null| A| 
|125|null| B| 
|125|null| C| 
|125|null| D| 
|125|null| E| 
|125| F| F| 
|125|null| G| 
|125|null| H| 
+---+----+-----+ 

我希望它有幫助!

+0

如果列中有N個ID,值只在幾個日期,當加入獨特的日期DF2我們需要得到上述output_DF。爲了更清晰地編輯問題。 @himanshulllTian – maduri

+0

@maduri - 我已根據編輯的問題更新了我的答案。 – himanshuIIITian

+0

:65:error:value union不是org.apache.spark.sql.DataFrame的成員 – maduri

0

全部加入可能的值,然後留下原始數據幀加入:

import hiveContext.implicits._ 
val df1Data = List((120, "D"), (120, "E"), (125, "F")) 
val df2Data = List("A", "B", "C", "D", "E", "F", "G", "H") 
val df1 = sparkContext.parallelize(df1Data).toDF("id", "date") 
val df2 = sparkContext.parallelize(df2Data).toDF("date") 

// get unique ID: 120, 125 
val uniqueIDDF = df1.select(col("id")).distinct() 
val fullJoin = uniqueIDDF.join(df2) 
val result = fullJoin.as("full").join(df1.as("df1"), col("full.id") === col("df1.id") && col("full.date") === col("df1.date"), "left_outer") 

val sorted = result.select(col("full.id"), col("df1.date"), col("full.date")).sort(col("full.id"), col("full.date")) 
sorted.show(false) 

輸出:

+---+----+----+ 
|id |date|date| 
+---+----+----+ 
|120|null|A | 
|120|null|B | 
|120|null|C | 
|120|D |D | 
|120|E |E | 
|120|null|F | 
|120|null|G | 
|120|null|H | 
|125|null|A | 
|125|null|B | 
|125|null|C | 
|125|null|D | 
|125|null|E | 
|125|F |F | 
|125|null|G | 
|125|null|H | 
+---+----+----+ 

排序這裏只是爲了顯示同樣的結果,可以跳過。