2016-09-06 50 views
0

是否有任何有效的方法可以從RDD中獲取前1000個項目,並將其從RDD中移除?Spark - Take並減去

目前我在做什麼是:

small_array = big_sorted_rdd.take(1000) 
big_sorted_rdd_without_small_array = big_sorted_rdd.subtract(
    sc.parallize(small_array)) 

回答

0

這不是因爲RDD做一個簡單的事情被分配(如名稱狀態),所以定義1000第一項是不是直線前進。

然而,除了你的建議之外,還有很多方法可以實現你在做的事情。

首先,您可以定義項目的順序,例如將每個項目轉換爲元組:(number,item),其中編號從1到#個元素。然後使用上RDD過濾器切斷第一X項目:

big_rdd.filter(_._1 > 1000) 

二的想法,我需要擺脫X第一要素的時間是執行過的項目迭代計算(記得有一次我試着減少系統負載)。所以,你可以做的是你的數據分割成每塊1000元的部分,然後取一大塊的時間和在其上執行你的計算,使RDD是這樣的:

RDD[(chunkNumber, List[elements])] 

,那麼你將工作在每次1000元,以這樣的想法還可以讓你的數據集是這樣的:

Array[RDD[elements]] 

然後在RDD工作每次(迭代這個數組)1000元

0

VAL zippedRDD = RDD的。 zipWithIndex()

VAL neededRDD = rdd.filter(T => t._2 < 1000)

VAL unNeededRDD = rdd.filter(T => t._2> = 1000)