2016-08-12 300 views
1

我在尋找與此問題等價的Pyspark:How to get the number of elements in partition?計算每個pyspark RDD分區中的元素數

具體而言,我想以編程方式計算pyspark RDD或數據框的每個分區中元素的數量(我知道此信息在Spark Web UI中可用)。

這種嘗試的結果「AttributeError的:‘NoneType’對象有沒有屬性‘_jvm’」:

df.foreachPartition(lambda iter: sum(1 for _ in iter))

我不想收集迭代器的內容到內存中。

回答

4

如果您問:我們可以在迭代器中獲取元素的數量而無需迭代它嗎?答案是No

但是我們沒有將其存儲在內存中,如在後你所提到的:

def count_in_a_partition(idx, iterator): 
    count = 0 
    for _ in iterator: 
    count += 1 
    return idx, count 

data = sc.parallelize([ 
    1, 2, 3, 4 
], 4) 

data.mapPartitionsWithIndex(count_in_a_partition).collect() 

編輯

請注意,你的代碼是非常接近的解決方案,只是mapPartitions需要返回一個迭代器:

def count_in_a_partition(iterator): 
    yield sum(1 for _ in iterator) 

data.mapPartitions(count_in_a_partition).collect() 
+0

謝謝@ShuaiYuan。不,我知道我必須重複計算。 您的第一個解決方案適用於我! 但是,第二個仍然會引發與我在Spark 1.5.0(我的組織集羣)中的初始嘗試相同的AttributeError,即使在您的示例中創建的「數據」rdd中也是如此。 AttributeError:'NoneType'對象沒有屬性'_jvm'。但是,在運行1.6.0或1.5.2的Spark Community Edition中,您的兩個解決方案都可以工作。也許我的本地CDH發行版有些奇怪? –

+0

可能是。不幸的是我沒有一個Spark 1.5.0來測試。 – ShuaiYuan