2017-06-19 321 views
1

只是爲了學習的目的,我試圖設置一個字典作爲累加器中的全局變量添加功能效果很好,但我運行代碼並將字典放在地圖函數中,它總是返回空。pyspark中的累加器與字典作爲全局變量

但是設置列表作爲一個全局變量

class DictParam(AccumulatorParam): 
    def zero(self, value = ""): 
     return dict() 

    def addInPlace(self, acc1, acc2): 
     acc1.update(acc2) 


if __name__== "__main__": 
    sc, sqlContext = init_spark("generate_score_summary", 40) 
    rdd = sc.textFile('input') 
    #print(rdd.take(5)) 



    dict1 = sc.accumulator({}, DictParam()) 


    def file_read(line): 
     global dict1 
     ls = re.split(',', line) 
     dict1+={ls[0]:ls[1]} 
     return line 


    rdd = rdd.map(lambda x: file_read(x)).cache() 
    print(dict1) 
+0

我的問題是地圖總是空的 – user3341953

回答

1

我相信print(dict1())只是在rdd.map()之前執行。

火花,有兩種類型的operations

  • 轉變,即描述未來計算
  • 和行動,也呼籲採取行動,實際上觸發執行

累加器僅在some action is executed時更新:

累加器不會更改Spark的懶惰評估模型。如果他們 正在RDD上的操作內更新,則其值僅爲 ,因爲RDD是作爲操作的一部分計算的。

如果檢查出的文檔的本節結束時,有一個例子恰好喜歡你:

accum = sc.accumulator(0) 
def g(x): 
    accum.add(x) 
    return f(x) 
data.map(g) 
# Here, accum is still 0 because no actions have caused the `map` to be computed. 

所以,你會需要添加一些動作,例如:

rdd = rdd.map(lambda x: file_read(x)).cache() # transformation 
foo = rdd.count() # action 
print(dict1) 

請確保檢查各種RDD功能和累加器特性的細節,因爲這可能會影響結果的正確性。 (例如,rdd.take(n)默認爲only scan one partition,不是整個數據集。)

+0

謝謝,我現在會嘗試。 – user3341953

1

對於範圍內的行爲進行蓄能器的更新類似的代碼只,其價值是 只更新一次該RDD計算作爲行動的一部分

+0

謝謝你的迴應。我不太明白,爲什麼在我的代碼中,字典作爲全局變量沒有更新,一直是空的?我厭倦列表案例,它運作良好。你能解釋更多嗎?在此先感謝 – user3341953