我正在Scala上編寫Spark上的程序。它用於計算鍵的數量。下面是數據例如:Spark reduceBykey效果不佳
Name Fruit Place A apple China A apple China A apple U.S A banana U.K B apple Japan B orange Chile C apple French
這是很多列的數據幀,但我只在乎上面的三列,所以可能會有一些重複的記錄。我想算,例如,吃的水果生產名額由A.
val res = data.select("name","fruit","place")
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b)
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b)
我首先選擇我需要的列,然後使用(「名」,「水果」)爲重點爲每個人所吃的每種水果收集一個ArrayBuffer的生產地點。然後,我使用「名稱」作爲密鑰來收集每個水果的生產地點數量,如{「apple」:2}。因此,結果非正式地像RDD [(「name」,Map(「fruit」 - >「places count」))]。
在程序中,我做了這類關於3次的工作來計算類似於上面例子的信息。例如,計算每個人在一個生產場所中不同水果的數量。
數據大小約爲80GB,我在50個執行程序上運行該作業。每個執行器有4個內核和24GB的內存。此外,數據被重新分區爲200個分區。所以這個工作應該在很短的時間內完成,正如我所料。然而,我花了超過一天運行作業和失敗,因爲org.apache.spark.shuffle.MetadataFetchFailedException:缺少輸出位置洗牌10和java.lang.OutOfMemoryError:GC開銷限制超過。
我做了很多事情來優化此程序,如重置spark.mesos.executor.memoryOverhead並使用可變映射來最小化頻繁創建和清理對象的GC成本。我甚至嘗試使用reduceByKey將具有相同密鑰的數據移入一個分區以提高性能,但幾乎沒有什麼幫助。代碼是這樣的:
val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place")))))
.rdd.reduceByKey((a,b)=>a++=b).cache()
然後我不需要每次我做相似的計算洗牌數據。後面的工作可以在new_data的基礎上完成。但是,似乎這種優化不起作用。
最後,我發現大約有50%的數據在字段「name」上有相同的值,比如說「H」。我刪除名稱爲「H」的數據,並在1小時內完成作業。
這裏是我的問題:
爲什麼按鍵的分佈對reduceByKey的表現這麼大的影響呢?我使用「分佈」一詞來表示不同鍵的出現次數。在我的情況下,數據的大小並不大,但一個關鍵是數據的主導,所以性能受到很大影響。我認爲這是reduceByKey的問題,我錯了嗎?
如果我必須保留名稱爲「H」的記錄,如何避免性能問題?
是否可以使用reduceByKey重新分區數據並將具有相同密鑰(「名稱」)的記錄放入一個分區?
真的有助於將具有相同密鑰(「名稱」)的記錄移動到一個分區以提高性能?我知道這可能會導致內存問題,但我必須在程序中多次運行類似的代碼,所以我想這可能會有助於後續工作。我對嗎?
感謝您的幫助!
我最好的猜測是,當你通過按鍵減少,全名稱爲「H」的記錄被髮送到一個節點(40GB的數據!),然後一個節點試圖通過這一切(有大量的數據溢出到磁盤上) – bendl
@bendl感謝評論。是的,我認爲是這樣,但即使有40GB的數據,執行程序的內存也是24GB,這意味着它不應該那麼慢(超過1天)。你有什麼想法嗎? – Bin