2017-07-14 72 views
0

我正在使用Spark實現以下邏輯。將數據幀拆分成更小的數據幀並將大數據幀推送給所有執行者?

  1. 獲取50K行表的結果。
  2. 獲取另一張表格(約30K行)。
  3. 對於(1)和(2)之間的所有組合,做一些工作並獲得價值。

將(2)的數據幀推送到所有執行程序和分區(1)並在每個執行程序上運行每個部分?如何實現它?

val getTable(t String) = 
    sqlContext.read.format("jdbc").options(Map(
    "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
    "url" -> jdbcSqlConn, 
    "dbtable" -> s"$t" 
)).load() 
    .select("col1", "col2", "col3") 

val table1 = getTable("table1") 
val table2 = getTable("table2") 

// Split the rows in table1 and make N, say 32, data frames 
val partitionedTable1 : List[DataSet[Row]] = splitToSmallerDFs(table1, 32) // How to implement it? 

val result = partitionedTable1.map(x => { 
    val value = doWork(x, table2) // Is it good to send table2 to executors like this? 
    value 
}) 

問:

  1. 如何破解大數據幀分成小的數據幀? (重新分區?)
  2. 向這樣的執行器發送table2(傳遞一個大數據幀作爲參數)是否很好?

回答

1

如何破解大數據幀分成小的數據幀? (再分配?)

簡單的答案是肯定的repartion可以是一個解決辦法。

問題可能是,將數據幀重新分區到更小的分區改善整體操作?

數據幀已經在本質上分佈了。這意味着您在數據框上執行的操作(如連接,groupBy,聚合,函數等等)都在數據所在的位置執行。但如加入操作,GROUPBY,在需要洗牌聚集,重新分區將是無效的

  1. GROUPBY操作將打亂數據幀,使得不同的羣體將是相同的執行人。

  2. partitionBy在窗口函數執行方式GROUPBY

  3. 聯接操作將在相同的方式混洗數據相同。

是好送表2(通過一個大的數據幀作爲參數)這樣的執行者?

它不是很好地傳遞數據幀。由於您正在傳輸數據幀,所以table2對執行者不可見。

我會建議你使用broadcast variable

你可以做如下

val table2 = sparkContext.broadcast(getTable("table2")) 
val result = partitionedTable1.map(x => { 
    val value = doWork(x, table2.value) 
    value 
}) 
+0

感謝。你的意思是'val result = table1.map(x => {'(not'partitionedTable1')在你的答案中代碼的第二行?由於數據框會默認分發給所有執行者?不需要手動分割它 – ca9163d9

+0

nope。那不是我的意思,我建議使用dataframe作爲廣播變量,並在其他函數中訪問它,而不是將數據幀作爲參數傳遞。 –

+0

我需要'splitToSmallerDFs()'來將'table1'拆分爲'partitionedTable1:List [DataSet [Row]]'?是否將'table1.map(...)'的執行分配給所有執行者? – ca9163d9