2017-06-05 87 views
0

我知道有不同的方法來計算文本或列表中的元素數量。但我試圖理解爲什麼這個不起作用。我想寫一個等效代碼使用Spark計算文本或列表中元素的數量

A_RDD=sc.parallelize(['a', 1.2, []]) 

acc = sc.accumulator(0) 
acc.value 
A_RDD.foreach(lambda _: acc.add(1)) 
acc.value 

其結果爲3 要做到這一點,我定義稱爲MY_COUNT(_)下面的函數,但我不知道如何獲得的結果。 A_RDD.foreach(my_count)不會執行任何操作。我也沒有收到任何錯誤。我做錯了什麼?

counter = 0 #function that counts elements 
def my_count(_): 
    global counter 
    counter += 1 

A_RDD.foreach(my_count) 
+0

瑪麗,你能否確認發佈的答案是否能澄清你的問題? –

回答

1

A_RDD.foreach(my_count)操作不會在您的本地Python虛擬機上運行。它運行在遠程執行程序節點中。因此,驅動器會將my_count方法與變量counter一起發送到每個執行程序節點,因爲該方法引用該變量。因此每個執行器節點都有自己的定義counter變量,該變量通過foreach方法更新,而在驅動程序中定義的變量counter不會增加。

一個簡單但危險的解決方案是收集驅動程序上的RDD,然後像下面那樣計算計數。這是有風險的,因爲整個RDD內容被下載到可能導致MemoryError的驅動程序的內存中。

>>> len(A_RDD.collect()) 
3 
0

那麼,如果你是在本地運行而不是在集羣上運行。在spark/scala中,這種行爲在局部和clust之間變化。它將具有預期的本地值,但在羣集中它不會具有與您所描述的相同的值......在spark/python中會發生同樣的情況嗎?我的猜測是這樣。