2017-04-11 85 views
0

我不明白下面的代碼有什麼問題。這工作正常,並且hashmap typeMap得到更新,如果我的輸入數據框未分區。但是,如果下面的代碼在分區環境中執行,則typeMap始終爲空且不會更新。這段代碼有什麼問題?感謝你的幫助。斯卡拉hashmap沒有被追加

var typeMap = new mutable.HashMap[String, (String, Array[String])] 
case class Combiner(,,,,,,, mapTypes: mutable.HashMap[String, (String, Array[String])]) { 
    def execute() { 
     <...> 
     val combinersResult = dfInput.rdd.aggregate(combiners.toArray) (incrementCount, mergeCount) 
    } 

    def updateTypes(arr: Array[String], tempMapTypes:mutable.HashMap[String, (String, Array[String])]): Unit = { 
     <...> 
     typeMap ++= tempMapTypes 
    } 

    def incrementCount(combiners: Array[Combiner], row: Row): Array[Combiner] = { 
     for (i <- 0 until row.length) { 
      val array = getMyType(row(i), tempMapTypes) 
      combiners(i). updateTypes(array, tempMapTypes) 
     } 
     combiners 
} 
+0

嗨,有什麼線索嗎? – Garipaso

回答

2

在分佈式計算中使用可變值是一個非常糟糕的主意。特別是使用Spark時,RDD操作從驅動程序交付給執行程序,並在集羣中的所有不同機器上並行執行。對您的mutable.HashMap所做的更新永遠不會發回給驅動程序,所以您首先遇到了在驅動程序中構建的空白地圖。

因此,您需要完全重新考慮您的數據結構,因爲它們更喜歡不變性,並且要記住,在執行程序上執行的操作是獨立且平行的。

+0

非常感謝回覆,我現在明白了,但在這裏繼續的理想方式是什麼?我需要更新的地圖進行進一步處理。 – Garipaso

+0

如果您只是在您對RDD進行操作時創建帶有附加值的新地圖,則不需要將地圖設置爲可變,但是如果您必須使用可變地圖,則至少需要在操作中對其進行初始化,以便每臺機器都有其地圖自己的副本。 – Vidya

+0

再次感謝,我已經嘗試過,更新每個分區中的地圖,然後使用combine()從所有分區追加地圖,但結果耗時,有沒有更簡單的方法來實現這一點? – Garipaso