1
Apache Spark如何檢測重複行?Apache Spark如何檢測重複項?它可以被修改嗎?
的原因,我問的是,我想有一個稍微不同的行爲:
在設定的用於重複檢測列,對於他們中的一些(這是double
型)我想是重複檢測基於兩個值之間的差異低於某個閾值(由我指定)。
我想這可能會使用crossJoin()
與適當的where
聲明後,但是,我希望有一個更優雅的解決方案?
謝謝!
Apache Spark如何檢測重複行?Apache Spark如何檢測重複項?它可以被修改嗎?
的原因,我問的是,我想有一個稍微不同的行爲:
在設定的用於重複檢測列,對於他們中的一些(這是double
型)我想是重複檢測基於兩個值之間的差異低於某個閾值(由我指定)。
我想這可能會使用crossJoin()
與適當的where
聲明後,但是,我希望有一個更優雅的解決方案?
謝謝!
它使用HashArggregate
:
scala> df.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[x#12], functions=[])
+- Exchange hashpartitioning(x#12, 200)
+- *HashAggregate(keys=[x#12], functions=[])
+- LocalTableScan [x#12]
我希望一個更優雅的解決方案?
您可以嘗試近似加入由LSH運營商提供:
但它是不可能與單一功能的工作。
您可以對窗口函數使用類似會話的方法,但只有在可以將數據劃分爲多個分區時纔有用。如果你是罰款近似可以使用固定大小的範圍,然後申請我所描述的方法在Spark - Window with recursion? - Conditionally propagating values across rows
與sort
隨後與mapPartitions
另一種近似可以實現。
df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF()
其中dropDuplicates
可以實施類似於:
def drop_duplicates(xs):
prev = None
for x in xs:
if prev is None or abs(x - prev) > threshold:
yield x
prev = x
隨着一點點努力,你可以把它在分區邊界一致爲好。
感謝您的好和有趣的指針。問題是:我試圖得到一個*確切的*解決方案。對於一個近似的解決方案,我可以根據閾值進行乘/除/整,然後完成。順便說一句:我有一個'groupID'列可以用於'Window.partitionBy('groupID')'。 –