2015-10-16 43 views
1

我有一個Spark RDD,看起來像這樣:星火RDD刪除記錄有多個按鍵

[(1, ...), 
(1, ...), 
(2, ...), 
(3, ...)] 

,我試圖刪除具有重複鍵的記錄,在這種情況下,我要排除一切記錄有關鍵'1'。而最終輸出我想應該是這樣

[(2, ...), 
(3, ...)] 

我至今嘗試過,它的工作,但我的直覺說應該有一個更好的解決方案:

>> a = sc.parallelize([(1,[1,1]), (1,[1,1]), (2,[1,1]), (3,[1,1])]) 
>> print a.groupByKey() \ 
    .filter(lambda x: len(x[1])==1) \ 
    .map(lambda x: (x[0], list(x[1])[0])).collect() 
[(2, [1, 1]), (3, [1, 1])] 

誰能幫助我在這?其他

+0

你能或者接受的答案或解釋爲什麼它不工作,因此可以改善?在此先感謝:) – zero323

+0

此外,如果你可以看看這個http://stackoverflow.com/q/33157978/1560062如果你沒有找到有用的答案,我會刪除。 – zero323

回答

1

兩個選項:

  1. subtractByKey - 這需要洗牌所以總成本可以類似於groupByKey。您也可以選擇分區輸入RDDpreservesPartitioning設置爲True

    from operator import add 
    
    counts = (a.keys() 
        .map(lambda x: (x, 1)) 
        .reduceByKey(add)) 
    
    duplicates = (counts 
        .filter(lambda x: x[1] > 1) 
        .map(lambda x: (x[0], None))) 
    
    a.subtractByKey(duplicates) 
    
  2. 廣播變量:

    • 正濾波器 - 如果您預計會有大量的重複的

      non_duplicated = sc.broadcast(set(
          counts.filter(lambda x: x[1] == 1).keys().collect() 
      )) 
      
      a.filter(lambda x: x[0] in non_duplicated.value) 
      
    • 否定過濾器 - 如果預期有少量重複項目

      duplicated = sc.broadcast(set(
          counts.filter(lambda x: x[1] > 1).keys().collect() 
      )) 
      
      a.filter(lambda x: x[0] not in duplicated.value)