2017-10-21 63 views
3

我正在用databricks筆記本上的Spark數據集API編寫應用程序。如何平均分配數據集以避免歪斜的連接(和長時間運行的任務)?

我有2個表格。一個是15億行,其次是250萬。兩個表都包含電信數據,並且使用國家代碼和數字的前5位完成連接。產量有550億行。問題是我有偏斜的數據(長時間運行的任務)。無論我如何重新分配數據集,由於散列鍵的分佈不均勻,我會得到長時間運行的任務。

我試着用廣播連接,試圖堅持在內存等大表分區.....

什麼是我選擇這裏?

+0

示例數據集/代碼段,你嘗試過,但沒有成功需要 –

+0

你說你嘗試過「廣播連接」,如果它的超出自動路由,那麼它不會被廣播散列連接。它在內部選擇一些其他聯接(可能是sortmerge或shuffle聯接)。不要'smallDF.join(largeDF)'它應該'largeDF.join(smallDF)'和這個小DF應該適合內存,這樣它會被廣播給所有執行者 –

+0

@RamGhadiyaram我試過使用顯式廣播連接。問題是數據偏斜。我無法在數據集上創建自定義分區程序,因此只有可用的方法是重新分區(num_part,join_key1,join_key2),其中join_key1是國家代碼,join_key2是5位數字前綴。但是,這不起作用,因爲數據分佈不均勻...... – StStojanovic

回答

2

火花將重新分區基於連接鍵的數據,所以重新分區前加入不會改變歪斜,如果你知道這是造成歪斜的關鍵(只添加不必要的洗牌)

(一般會有一些像空或0或「」),將您的數據分成2部分 - 1個數據集與歪斜鍵,和另一個與其他

並在子數據集上進行連接,並結合結果

例如:

val df1 = ... 
val df2 = ... 
val skewKey = null 

val df1Skew = df1.where($"key" === skewKey) 
val df2Skew = df2.where($"key" === skewKey) 

val df1NonSkew = df1.where($"key" =!= skewKey) 
val df2NonSkew = df2.where($"key" =!= skewKey) 

val dfSkew = df1Skew.join(df2Skew) //this is a cross join 
val dfNonSkew = df1NonSkew.join(df2NonSkew, "key") 

val res = dfSkew.union(dfNonSkew) 
相關問題