我是Apache Spark的初學者。我想過濾出所有在RDD中權重總和大於常數的組。 「體重」圖也是RDD。這裏是一個小尺寸的演示中,待過濾的被存儲在「基團」的基團,所述恆定值是12:如何根據基於Spark中另一個RDD的函數過濾RDD?
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
當輸入數據是非常大的,> 10GB例如,我總是遇到「java堆內存不足」錯誤。我懷疑它是否是由「weights.toArray.toMap」引起的,因爲它將分佈式RDD轉換爲JVM中的Java對象。所以,我想用RDD直接進行過濾:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
當我加載這個腳本到火花後殼跑result.collect
,我得到了一個「顯示java.lang.NullPointerException」錯誤。有人告訴我,在另一個RDD中操作RDD時,會出現空指針異常,並建議我將權重放入Redis。
那麼我怎樣才能得到「結果」沒有轉換「權重」地圖,或把它放到Redis?如果有一種解決方案可以在沒有外部數據存儲服務的幫助下基於另一個類似地圖的RDD過濾RDD? 謝謝!
放大'spark.executor.memory'確實有效。 – Chad 2014-09-28 03:37:30