2017-07-24 64 views
0

我有一個數據幀,看起來像這樣:星火合併套常見的元素

+-----------+-----------+ 
| Package | Addresses | 
+-----------+-----------+ 
| Package 1 | address1 | 
| Package 1 | address2 | 
| Package 1 | address3 | 
| Package 2 | address3 | 
| Package 2 | address4 | 
| Package 2 | address5 | 
| Package 2 | address6 | 
| Package 3 | address7 | 
| Package 3 | address8 | 
| Package 4 | address9 | 
| Package 5 | address9 | 
| Package 5 | address1 | 
| Package 6 | address10 | 
| Package 7 | address8 | 
+-----------+-----------+ 

我需要找到被視爲一起在不同的軟件包的所有地址。輸出示例:

+----+------------------------------------------------------------------------+ 
| Id |        Addresses        | 
+----+------------------------------------------------------------------------+ 
| 1 | [address1, address2, address3, address4, address5, address6, address9] | 
| 2 | [address7, address8]             | 
| 3 | [address10]               | 
+----+------------------------------------------------------------------------+ 

所以,我有DataFrame。我被package分組它(而不是分組):

val rdd = packages.select($"package", $"address"). 
    map{ 
    x => { 
     (x(0).toString(), x(1).toString()) 
    } 
    }.rdd.combineByKey(
    (source) => { 
    Set[String](source) 
    }, 

    (acc: Set[String], v) => { 
    acc + v 
    }, 

    (acc1: Set[String], acc2: Set[String]) => { 
    acc1 ++ acc2 
    } 
) 

然後,我合併具有共同地址行:

val result = rdd.treeAggregate(
    Set.empty[Set[String]] 
)(
    (map: Set[Set[String]], row) => { 
    val vals = row._2 
    val sets = map + vals 

    // copy-paste from here https://stackoverflow.com/a/25623014/772249 
    sets.foldLeft(Set.empty[Set[String]])((cum, cur) => { 
     val (hasCommon, rest) = cum.partition(_ & cur nonEmpty) 
     rest + (cur ++ hasCommon.flatten) 
    }) 
    }, 
    (map1, map2) => { 
    val sets = map1 ++ map2 

    // copy-paste from here https://stackoverflow.com/a/25623014/772249 
    sets.foldLeft(Set.empty[Set[String]])((cum, cur) => { 
     val (hasCommon, rest) = cum.partition(_ & cur nonEmpty) 
     rest + (cur ++ hasCommon.flatten) 
    }) 
    }, 
    10 
) 

但是,無論我做什麼,treeAggregate正在很長,我不能完成單一任務。原始數據大小約爲250GB。我嘗試過不同的羣集,但treeAggregate花費的時間太長。

treeAggregate之前的所有內容都很好用,但之後就會出現問題。

我試過了不同的spark.sql.shuffle.partitions(默認值是2000,10000),但它似乎並不重要。

我試過不同depthtreeAggregate,但沒有注意到區別。

相關問題:

  1. Merge Sets of Sets that contain common elements in Scala
  2. Spark complex grouping
+0

我不知道我明白你想要做什麼。爲什麼不這樣做:packages.groupBy(「packages」)。agg(collect_set(「address」))? –

+0

@AssafMendelson因爲它會給我完全不同的結果比我需要。請仔細觀察預期結果。如果我會分組,我會得到7個不同的結果,但我預計只有三個。 – twoface88

+0

@AssafMendelson示例:address4和address1屬於一起,即使它們屬於不同的包,因爲地址3已經在package1和package2中看到。因此,來自package1和package2的所有地址都屬於同一個地址,依此類推。 – twoface88

回答

3

看看你的數據,就好像它在這裏的地址是頂點的圖,他們有一個連接,如果有包兩個都。那麼解決您的問題將是圖的connected components

Sparks gpraphX庫具有優化函數以查找connected components。它將返回不同連接組件中的頂點,將它們視爲每個連接組件的ID。

然後有了id,你可以收集連接到它的所有其他地址,如果需要的話。

看看this article他們如何使用圖表來實現與您相同的分組。

+0

這非常有趣,謝謝。 – twoface88