我有一個數據幀,看起來像這樣:星火合併套常見的元素
+-----------+-----------+
| Package | Addresses |
+-----------+-----------+
| Package 1 | address1 |
| Package 1 | address2 |
| Package 1 | address3 |
| Package 2 | address3 |
| Package 2 | address4 |
| Package 2 | address5 |
| Package 2 | address6 |
| Package 3 | address7 |
| Package 3 | address8 |
| Package 4 | address9 |
| Package 5 | address9 |
| Package 5 | address1 |
| Package 6 | address10 |
| Package 7 | address8 |
+-----------+-----------+
我需要找到被視爲一起在不同的軟件包的所有地址。輸出示例:
+----+------------------------------------------------------------------------+
| Id | Addresses |
+----+------------------------------------------------------------------------+
| 1 | [address1, address2, address3, address4, address5, address6, address9] |
| 2 | [address7, address8] |
| 3 | [address10] |
+----+------------------------------------------------------------------------+
所以,我有DataFrame。我被package
分組它(而不是分組):
val rdd = packages.select($"package", $"address").
map{
x => {
(x(0).toString(), x(1).toString())
}
}.rdd.combineByKey(
(source) => {
Set[String](source)
},
(acc: Set[String], v) => {
acc + v
},
(acc1: Set[String], acc2: Set[String]) => {
acc1 ++ acc2
}
)
然後,我合併具有共同地址行:
val result = rdd.treeAggregate(
Set.empty[Set[String]]
)(
(map: Set[Set[String]], row) => {
val vals = row._2
val sets = map + vals
// copy-paste from here https://stackoverflow.com/a/25623014/772249
sets.foldLeft(Set.empty[Set[String]])((cum, cur) => {
val (hasCommon, rest) = cum.partition(_ & cur nonEmpty)
rest + (cur ++ hasCommon.flatten)
})
},
(map1, map2) => {
val sets = map1 ++ map2
// copy-paste from here https://stackoverflow.com/a/25623014/772249
sets.foldLeft(Set.empty[Set[String]])((cum, cur) => {
val (hasCommon, rest) = cum.partition(_ & cur nonEmpty)
rest + (cur ++ hasCommon.flatten)
})
},
10
)
但是,無論我做什麼,treeAggregate
正在很長,我不能完成單一任務。原始數據大小約爲250GB。我嘗試過不同的羣集,但treeAggregate
花費的時間太長。
treeAggregate
之前的所有內容都很好用,但之後就會出現問題。
我試過了不同的spark.sql.shuffle.partitions
(默認值是2000,10000),但它似乎並不重要。
我試過不同depth
爲treeAggregate
,但沒有注意到區別。
相關問題:
我不知道我明白你想要做什麼。爲什麼不這樣做:packages.groupBy(「packages」)。agg(collect_set(「address」))? –
@AssafMendelson因爲它會給我完全不同的結果比我需要。請仔細觀察預期結果。如果我會分組,我會得到7個不同的結果,但我預計只有三個。 – twoface88
@AssafMendelson示例:address4和address1屬於一起,即使它們屬於不同的包,因爲地址3已經在package1和package2中看到。因此,來自package1和package2的所有地址都屬於同一個地址,依此類推。 – twoface88