2016-07-23 146 views
0

我有以下RDD包含我想逐項相似的項目集合(同一集合中的項目被認爲是相似的,相似性是可傳遞的,並且所有集合中的項目至少包含一個共同的項目也被認爲是相似的)減少Spark RDD返回多個值

輸入RDD:

Set(w1, w2) 
Set(w1, w2, w3, w4) 
Set(w5, w2, w6) 
Set(w7, w8, w9) 
Set(w10, w5, w8) --> All the first 5 set elements are similar as each of the sets have atleast one common item 
Set(w11, w12, w13) 

我想上述RDD降低到

Set(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10) 
Set(w11, w12, w13) 

不限sugge我怎麼能做到這一點?我無法做到像下面那樣我可以忽略減少兩套,如果他們不包含任何常見元素:

data.reduce((a,b) => if (a.intersect(b).size > 0) a ++ b ***else (a,b)***) 

謝謝。

回答

0

您的reduce算法實際上是不正確的。例如,如果一個集合不能與下一個集合合併,但仍然可以與collect中的另一個集合合併。

可能有更好的方法,但我想通過將其轉換爲圖形問題並使用Graphx來解決此問題。

val data = Array(Set("w1", "w2", "w3"), Set("w5", "w6"), Set("w7"), Set("w2", "w3", "w4")) 
val setRdd = sc.parallelize(data).cache 

// Generate an unique id for each item to use as vertex's id in the graph 
val itemToId = setRdd.flatMap(_.toSeq).distinct.zipWithUniqueId.cache 
val idToItem = itemToId.map { case (item, itemId) => (itemId, item) } 

// Convert to a RDD of set of itemId 
val newSetRdd = setRdd.zipWithUniqueId 
    .flatMap { case (sets, setId) => 
    sets.map { item => (item, setId) } 
    }.join(itemToId).values.groupByKey().values 

// Create an RDD containing edges of the graph 
val edgeRdd = newSetRdd.flatMap { set => 
    val seq = set.toSeq 
    val head = seq.head 
    // Add an edge from the first item to each item in a set, 
    // including itself 
    seq.map { item => Edge[Long](head, item)} 
    } 

val graph = Graph.fromEdges(edgeRdd, Nil) 

// Run connected component algorithm to check which items are similar. 
// Items in the same component are similar 
val verticesRDD = graph.connectedComponents().vertices 

verticesRDD.join(idToItem).values.groupByKey.values.collect.foreach(println) 
+0

優秀。謝謝。從來沒有探索過Spark的Graphx庫,現在是我做的時候了。 – soontobeared