2014-09-25 131 views
3

我是Apache Spark的初學者。我想過濾出所有在RDD中權重總和大於常數的組。 「體重」圖也是RDD。這裏是一個小尺寸的演示中,待過濾的被存儲在「基團」的基團,所述恆定值是12:如何根據基於Spark中另一個RDD的函數過濾RDD?

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g")) 
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6))) 
val wm = weights.toArray.toMap 
def isheavy(inp: String): Boolean = { 
    val allw = inp.split(",").map(wm(_)).sum 
    allw > 12 
} 
val result = groups.filter(isheavy) 

當輸入數據是非常大的,> 10GB例如,我總是遇到「java堆內存不足」錯誤。我懷疑它是否是由「weights.toArray.toMap」引起的,因爲它將分佈式RDD轉換爲JVM中的Java對象。所以,我想用RDD直接進行過濾:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g")) 
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6))) 
def isheavy(inp: String): Boolean = { 
    val items = inp.split(",") 
    val wm = items.map(x => weights.filter(_._1 == x).first._2) 
    wm.sum > 12 
} 
val result = groups.filter(isheavy) 

當我加載這個腳本到火花後殼跑result.collect,我得到了一個「顯示java.lang.NullPointerException」錯誤。有人告訴我,在另一個RDD中操作RDD時,會出現空指針異常,並建議我將權重放入Redis。

那麼我怎樣才能得到「結果」沒有轉換「權重」地圖,或把它放到Redis?如果有一種解決方案可以在沒有外部數據存儲服務的幫助下基於另一個類似地圖的RDD過濾RDD? 謝謝!

回答

2

「java內存不足」錯誤即將到來,因爲spark在使用spark.default.parallelism屬性確定分割數量(缺省情況下爲可用內核數量)時會使用該屬性。

// From CoarseGrainedSchedulerBackend.scala 

override def defaultParallelism(): Int = { 
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) 
} 

當輸入變大並且內存有限時,應該增加分割數。

你可以做一些事情如下:

val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
val splitSize = 10000 // specify some number of elements that fit in memory. 

val numSplits = (input.size/splitSize) + 1 // has to be > 0. 
val groups = sc.parallelize(input, numSplits) // specify the # of splits. 

val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap 

def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12 
val result = groups.filter(isHeavy) 

您也可以考慮增加使用spark.executor.memory執行內存大小。

+0

放大'spark.executor.memory'確實有效。 – Chad 2014-09-28 03:37:30

4

假設你的組是唯一的。否則,首先通過不同的方式使其具有唯一性。 如果組或權重很小,應該很容易。如果組和權重都很大,您可以嘗試一下,這可能更具可擴展性,但看起來也很複雜。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g")) 
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6))) 
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d).... 
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s)))) 
//j will be (a, ((a,b,c,d),3)... 
val j = g1.join(weights) 
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ... 
val k = j.map(x=>(x._2._1, x._2._2)) 
//l will be ((a,b,c,d), (3,2,5,1))... 
val l = k.groupByKey() 
//filter by sum the 2nd 
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12}) 
//we only need the original list 
val result=m.map(x=>x._1) 
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc 
scala> result.foreach(println) 
List(e,g) 
List(b,c,e)