2017-06-23 54 views
3

在spark DF中使用dropDuplicates函數時將保留哪一行?這在火花文檔中沒有說明。dropDuplicates操作符中使用了哪些行?

  1. (按行排列)
  2. 保持最後一個保持第一(根據行順序)
  3. 隨機?

p.s.假設在分佈式環境紗(未掌握本地)

回答

4

TL; DR保持優先(根據行順序)

dropDuplicates操作者在火花SQL creates a logical plan with Deduplicate operator

Deduplicate操作is translated to First logical operator火花SQL的催化劑優化,這很好地回答你的問題(!)

你可以看到Deduplicate運營商在下面的邏輯計劃。

// create datasets with duplicates 
val dups = spark.range(9).map(_ % 3) 

val q = dups.dropDuplicates 

以下是q數據集的邏輯計劃。然後

scala> println(q.queryExecution.logical.numberedTreeString) 
00 Deduplicate [value#64L], false 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

Deduplicate操作被轉換爲First邏輯運算符(即示出了本身作爲優化後Aggregate操作者)。

scala> println(q.queryExecution.optimizedPlan.numberedTreeString) 
00 Aggregate [value#64L], [value#64L] 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

花一些時間審查阿帕奇星火的代碼後,我似乎已說服自己,dropDuplicates運營商正是groupBy其次first功能(!)

第一(columnName:String,ignoreNulls:Boolean):列集合函數:返回組中列的第一個值。

import org.apache.spark.sql.functions.first 
val firsts = dups.groupBy("value").agg(first("value") as "value") 
scala> println(firsts.queryExecution.logical.numberedTreeString) 
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139] 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

scala> firsts.explain 
== Physical Plan == 
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)]) 
+- Exchange hashpartitioning(value#64L, 200) 
    +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)]) 
     +- *SerializeFromObject [input[0, bigint, false] AS value#64L] 
     +- *MapElements <function1>, obj#63: bigint 
      +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long 
       +- *Range (0, 9, step=1, splits=8) 

可疑dropDuplicates操作者可以是更好的性能(如groupBy通常是中可能的解決方案的最慢)。

+0

似乎是一個潛在的性能改進是有一個無序/隨機dropDuplicates選項,即沒有執行第一個 – Qmage

+0

@Qmage我不知道如果第一個需要排序。我很懷疑。感謝您的發現。感謝您接受它作爲答案!讚賞。 –

+0

@JacekLaskowski你想知道如何選擇一個隨機值而不是第一個值嗎? – belka

相關問題