2017-08-24 65 views
1

Apache Spark如何檢測重複行?Apache Spark如何檢測重複項?它可以被修改嗎?

的原因,我問的是,我想有一個稍微不同的行爲:

在設定的用於重複檢測列,對於他們中的一些(這是double型)我想是重複檢測基於兩個值之間的差異低於某個閾值(由我指定)。

我想這可能會使用crossJoin()與適當的where聲明後,但是,我希望有一個更優雅的解決方案?

謝謝!

回答

1

它使用HashArggregate

scala> df.distinct.explain 
== Physical Plan == 
*HashAggregate(keys=[x#12], functions=[]) 
+- Exchange hashpartitioning(x#12, 200) 
    +- *HashAggregate(keys=[x#12], functions=[]) 
     +- LocalTableScan [x#12] 

我希望一個更優雅的解決方案?

您可以嘗試近似加入由LSH運營商提供:

但它是不可能與單一功能的工作。

您可以對窗口函數使用類似會話的方法,但只有在可以將數據劃分爲多個分區時纔有用。如果你是罰款近似可以使用固定大小的範圍,然後申請我所描述的方法在Spark - Window with recursion? - Conditionally propagating values across rows

sort隨後與mapPartitions另一種近似可以實現。

df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF() 

其中dropDuplicates可以實施類似於:

def drop_duplicates(xs): 
    prev = None 
    for x in xs: 
     if prev is None or abs(x - prev) > threshold: 
      yield x 
     prev = x 

隨着一點點努力,你可以把它在分區邊界一致爲好。

+0

感謝您的好和有趣的指針。問題是:我試圖得到一個*確切的*解決方案。對於一個近似的解決方案,我可以根據閾值進行乘/除/整,然後完成。順便說一句:我有一個'groupID'列可以用於'Window.partitionBy('groupID')'。 –

相關問題