2016-08-15 64 views
0

假設我有以下RDD:比較兩種不同的方法在星火:減少和排序

alist = [('a',[['1',2]]),('b',[['2',3]]),('b',[['8',5]]),('b',[['8',5]]),('c',[['4',22]]),('a',[['5',22]])] 
anRDD = sc.parallelize(alist) 

我的任務是從每串字母得到最高的int值列表(索引1列表)。如果有大量數據和大量不同的密鑰(字符串),建議採用以下哪種方法?

方法1:

import operator 

def sortAndTake(alistoflists): 
    alistoflists.sort(key=operator.itemgetter(1),reverse=True) 
    return alistoflists[0] 

reducedRDD = anRDD.reduceByKey(lambda a,b:a+b) 
finalRDD = reducedRDD.map(lambda x: (x[0],sortAndTake(x[1]))) 
finalRDD.collect() 

方法2:

def partitioner(n): 
    def partitioner_(x): 
     return portable_hash(x[0]) % n 
    return partitioner_ 

def sortIterator(iterator): 
    oldKey = None 
    cnt = 0 
    for item in iterator: 
     if item[0] != oldKey: 
      oldKey = item[0] 
      yield item 

partitioned = anRDD.keyBy(lambda kv:(kv[0],kv[1][0][1])) 

partitioned.repartitionAndSortWithinPartitions(
           numPartitions=2, 
           partitionFunc=partitioner(2),ascending=False) 
      .map(lambda x: x[1]) 
      .mapPartitions(sortIterator) 

(方法2從接受的回答(通過zero323從先前的問題我已啓發):Using repartitionAndSortWithinPartitions

根據我對第一種方法的理解,如果我們得到了大量不同的關鍵值,那麼reduceByKey中的工人之間會有很多混洗,使方法2更快(我不確定在方法2中使用repartitionAndSortWithinPartitions時是否發生了同樣的情況)。

任何見解?謝謝:)

回答

2

我的任務是從每個字符串中獲取列表與最高的int值(列表中的索引1)。

如果是這種情況,這兩種方法都非常低效。而不是僅僅reduceByKeymax

from operator import itemgetter 
from functools import partial 

anRDD.mapValues(itemgetter(0)).reduceByKey(partial(max, key=itemgetter(1))) 

關於兩項擬議的方法:

  • 洗牌都相同的數據量。
  • 第一個效率不高groupByKey
+0

再次謝謝你! :D –

+0

我真的很喜歡這種方式,我想知道你提供的答案可以擴展到N個最大值,而不僅僅是最大值。你能否提供一些見解? –

+0

你可以使用['np.partition'](http://docs.scipy.org/doc/numpy/reference/generated/numpy.partition.html#numpy.partition)來獲得topK。注意topK是未排序的。 – ShuaiYuan