2016-11-08 94 views
2

在進行研究時,我發現刪除Spark RDD中的所有子集有點困難。數據結構爲RDD[(key,set)]。例如,它可能是:如何有效地刪除火花RDD中的子集

RDD[ ("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)) ]

自組麥克風(Set(1,3))的是彼得(Set(1,2,3))的一個子集,我想刪除「邁克」,這將結束與

RDD[ ("peter",Set(1,2,3)), ("jack",Set(5)) ]

用python本地執行兩個「for」循環操作很容易。但是當我想用scala和spark來擴展到雲時,找到一個好的解決方案並不容易。

謝謝

+0

那你有關係嗎? '(「peter」,Set(1,2,3))'&'(「olga」,Set(1,2,3))' – maasg

+0

只需刪除其中一個。保持哪一個並不重要。 –

+0

提供的解決方案保留了兩者。您被邀請適應您的特定需求。 – maasg

回答

1

我懷疑我們可以逃逸到各元件相互比較(雙環流的在非分佈式算法的等效) 。集合之間的子集操作不是自反的,這意味着我們需要比較is "alice" subsetof "bob"is "bob" subsetof "alice"

爲此使用Spark API,我們可以訴諸使用笛卡爾積與本身的數據乘以和驗證結果矩陣的每個條目:

val data = Seq(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("anne", Set(7)),("jack",Set(5,4,1)), ("lizza", Set(5,1)), ("bart", Set(5,4)), ("maggie", Set(5))) 
// expected result from this dataset = peter, olga, anne, jack 
val userSet = sparkContext.parallelize(data) 
val prod = userSet.cartesian(userSet) 
val subsetMembers = prod.collect{case ((name1, set1), (name2,set2)) if (name1 != name2) && (set2.subsetOf(set1)) && (set1 -- set2).nonEmpty => (name2, set2) } 
val superset = userSet.subtract(subsetMembers)  

// lets see the results: 
superset.collect() 
// Array[(String, scala.collection.immutable.Set[Int])] = Array((olga,Set(1, 2, 3)), (peter,Set(1, 2, 3)), (anne,Set(7)), (jack,Set(5, 4, 1))) 
-3

您可以在地圖後使用過濾器。

您可以構建一個地圖,該地圖將爲您想要刪除的內容返回一個值。首先建立一個功能:

def filter_mike(line): 
    if line[1] != Set(1,3): 
     return line 
    else: 
     return None 

那麼你現在可以過濾這樣的:

your_rdd.map(filter_mike).filter(lambda x: x != None) 

這將工作

+0

再次閱讀問題。 – shanmuga

1

這可以通過使用RDD.fold功能來實現。
在這種情況下,所需的輸出是超集項目的「List」(ItemList)。對於此輸入也應被轉換爲「列表」(ITEMLIST的RDD)

import org.apache.spark.rdd.RDD 

// type alias for convinience 
type Item = Tuple2[String, Set[Int]] 
type ItemList = List[Item] 

// Source RDD 
val lst:RDD[Item] = sc.parallelize(List(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)))) 


// Convert each element as a List. This is needed for using fold function on RDD 
// since the data-type of the parameters are the same as output parameter 
// data-type for fold function 
val listOflst:RDD[ItemList] = lst.map(x => List(x)) 

// for each element in second ItemList 
// - Check if it is not subset of any element in first ItemList and add first 
// - Remove the subset of newly added elements 
def combiner(first:ItemList, second:ItemList) : ItemList = { 
    def helper(lst: ItemList, i:Item) : ItemList = { 
     val isSubset: Boolean = lst.exists(x=> i._2.subsetOf(x._2)) 
     if(isSubset) lst else i :: lst.filterNot(x => x._2.subsetOf(i._2)) 
    } 
    second.foldLeft(first)(helper) 
} 


listOflst.fold(List())(combiner)