0
假設我有以下RDD:比較兩種不同的方法在星火:減少和排序
alist = [('a',[['1',2]]),('b',[['2',3]]),('b',[['8',5]]),('b',[['8',5]]),('c',[['4',22]]),('a',[['5',22]])]
anRDD = sc.parallelize(alist)
我的任務是從每串字母得到最高的int值列表(索引1列表)。如果有大量數據和大量不同的密鑰(字符串),建議採用以下哪種方法?
方法1:
import operator
def sortAndTake(alistoflists):
alistoflists.sort(key=operator.itemgetter(1),reverse=True)
return alistoflists[0]
reducedRDD = anRDD.reduceByKey(lambda a,b:a+b)
finalRDD = reducedRDD.map(lambda x: (x[0],sortAndTake(x[1])))
finalRDD.collect()
方法2:
def partitioner(n):
def partitioner_(x):
return portable_hash(x[0]) % n
return partitioner_
def sortIterator(iterator):
oldKey = None
cnt = 0
for item in iterator:
if item[0] != oldKey:
oldKey = item[0]
yield item
partitioned = anRDD.keyBy(lambda kv:(kv[0],kv[1][0][1]))
partitioned.repartitionAndSortWithinPartitions(
numPartitions=2,
partitionFunc=partitioner(2),ascending=False)
.map(lambda x: x[1])
.mapPartitions(sortIterator)
(方法2從接受的回答(通過zero323從先前的問題我已啓發):Using repartitionAndSortWithinPartitions
)
根據我對第一種方法的理解,如果我們得到了大量不同的關鍵值,那麼reduceByKey
中的工人之間會有很多混洗,使方法2更快(我不確定在方法2中使用repartitionAndSortWithinPartitions
時是否發生了同樣的情況)。
任何見解?謝謝:)
再次謝謝你! :D –
我真的很喜歡這種方式,我想知道你提供的答案可以擴展到N個最大值,而不僅僅是最大值。你能否提供一些見解? –
你可以使用['np.partition'](http://docs.scipy.org/doc/numpy/reference/generated/numpy.partition.html#numpy.partition)來獲得topK。注意topK是未排序的。 – ShuaiYuan