2017-08-11 105 views
0

我正在Scala上編寫Spark上的程序。它用於計算鍵的數量。下面是數據例如:Spark reduceBykey效果不佳

 Name  Fruit   Place 
A  apple   China 
A  apple   China 
A  apple   U.S 
A  banana  U.K 
B  apple   Japan 
B  orange  Chile 
C  apple   French

這是很多列的數據幀,但我只在乎上面的三列,所以可能會有一些重複的記錄。我想算,例如,吃的水果生產名額由A.

val res = data.select("name","fruit","place") 
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b) 
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b) 

我首先選擇我需要的列,然後使用(「名」,「水果」)爲重點爲每個人所吃的每種水果收集一個ArrayBuffer的生產地點。然後,我使用「名稱」作爲密鑰來收集每個水果的生產地點數量,如{「apple」:2}。因此,結果非正式地像RDD [(「name」,Map(「fruit」 - >「places count」))]。

在程序中,我做了這類關於3次的工作來計算類似於上面例子的信息。例如,計算每個人在一個生產場所中不同水果的數量。

數據大小約爲80GB,我在50個執行程序上運行該作業。每個執行器有4個內核和24GB的內存。此外,數據被重新分區爲200個分區。所以這個工作應該在很短的時間內完成,正如我所料。然而,我花了超過一天運行作業和失敗,因爲org.apache.spark.shuffle.MetadataFetchFailedException:缺少輸出位置洗牌10java.lang.OutOfMemoryError:GC開銷限制超過

我做了很多事情來優化此程序,如重置spark.mesos.executor.memoryOverhead並使用可變映射來最小化頻繁創建和清理對象的GC成本。我甚至嘗試使用reduceByKey將具有相同密鑰的數據移入一個分區以提高性能,但幾乎沒有什麼幫助。代碼是這樣的:

val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place"))))) 
.rdd.reduceByKey((a,b)=>a++=b).cache() 

然後我不需要每次我做相似的計算洗牌數據。後面的工作可以在new_data的基礎上完成。但是,似乎這種優化不起作用。

最後,我發現大約有50%的數據在字段「name」上有相同的值,比如說「H」。我刪除名稱爲「H」的數據,並在1小時內完成作業。

這裏是我的問題:

  1. 爲什麼按鍵的分佈對reduceByKey的表現這麼大的影響呢?我使用「分佈」一詞來表示不同鍵的出現次數。在我的情況下,數據的大小並不大,但一個關鍵是數據的主導,所以性能受到很大影響。我認爲這是reduceByKey的問題,我錯了嗎?

  2. 如果我必須保留名稱爲「H」的記錄,如何避免性能問題?

  3. 是否可以使用reduceByKey重新分區數據並將具有相同密鑰(「名稱」)的記錄放入一個分區?

  4. 真的有助於將具有相同密鑰(「名稱」)的記錄移動到一個分區以提高性能?我知道這可能會導致內存問題,但我必須在程序中多次運行類似的代碼,所以我想這可能會有助於後續工作。我對嗎?

感謝您的幫助!

+0

我最好的猜測是,當你通過按鍵減少,全名稱爲「H」的記錄被髮送到一個節點(40GB的數據!),然後一個節點試圖通過這一切(有大量的數據溢出到磁盤上) – bendl

+0

@bendl感謝評論。是的,我認爲是這樣,但即使有40GB的數據,執行程序的內存也是24GB,這意味着它不應該那麼慢(超過1天)。你有什麼想法嗎? – Bin

回答

0

你可以做什麼來避免大洗牌是首先做一個從水果到地方的數據框架。

val fruitToPlaces = data.groupBy("fruit").agg(collect_set("place").as("places")) 

該數據幀要小(即適合在內存中) 你做fruitToPlaces.cache.count,以確保它的確定

然後你做水果加入。

data.join(fruitToPlaces, Seq("fruit"), "left_outer") 

星火應該足夠聰明,做一個散列連接(而不是一個洗牌加入)

+0

感謝您的評論,但它似乎與我想要的有點不同。就你而言,吃過同種水果的傢伙總是會得到同樣的地方。然而,事實並非如此,有人可能只是從美國購買蘋果,而不是其他地方。 – Bin

相關問題