在spark DF中使用dropDuplicates函數時將保留哪一行?這在火花文檔中沒有說明。dropDuplicates操作符中使用了哪些行?
- (按行排列)
- 保持最後一個保持第一(根據行順序)
- 隨機?
p.s.假設在分佈式環境紗(未掌握本地)
在spark DF中使用dropDuplicates函數時將保留哪一行?這在火花文檔中沒有說明。dropDuplicates操作符中使用了哪些行?
p.s.假設在分佈式環境紗(未掌握本地)
TL; DR保持優先(根據行順序)
dropDuplicates
操作者在火花SQL creates a logical plan with Deduplicate
operator。
這Deduplicate
操作is translated to First
logical operator火花SQL的催化劑優化,這很好地回答你的問題(!)
你可以看到Deduplicate
運營商在下面的邏輯計劃。
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
以下是q
數據集的邏輯計劃。然後
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Deduplicate
操作被轉換爲First
邏輯運算符(即示出了本身作爲優化後Aggregate
操作者)。
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
花一些時間審查阿帕奇星火的代碼後,我似乎已說服自己,dropDuplicates
運營商正是groupBy
其次first功能(!)
第一(columnName:String,ignoreNulls:Boolean):列集合函數:返回組中列的第一個值。
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
我可疑該dropDuplicates
操作者可以是更好的性能(如groupBy
通常是中可能的解決方案的最慢)。
似乎是一個潛在的性能改進是有一個無序/隨機dropDuplicates選項,即沒有執行第一個 – Qmage
@Qmage我不知道如果第一個需要排序。我很懷疑。感謝您的發現。感謝您接受它作爲答案!讚賞。 –
@JacekLaskowski你想知道如何選擇一個隨機值而不是第一個值嗎? – belka