2016-11-29 93 views
0

我希望此地圖縮減作業(下面的代碼)能夠輸出排名前10的最受歡迎的產品。它不斷給我以下錯誤信息:MapReduce作業使用Python的MRjob產生前10個值

it = izip(iterable,count(0,-1))#decorate TypeError:izip參數#1必須支持迭代。

我在想它與nlargest函數我試圖申請。

任何指針?

謝謝!

from mrjob.job import MRJob 
from mrjob.step import MRStep 
from heapq import nlargest 


class MostRatedProduct(MRJob): 

def steps(self): 
    return [ 
     MRStep(mapper = self.mapper_get_ratings, 
       reducer = self.reducer_count_ratings), 
     MRStep(reducer = self.reducer_find_top10) 
    ] 


def mapper_get_ratings(self, _, line): 
    (userID, itemID, rating, timestamp) = line.split(',') 
    yield itemID, 1 

def reducer_count_ratings(self, itemID, ratingCount): 
    yield None, (sum(ratingCount), itemID) 

def top_10(self, ratingPair): 
    for ratingTotal, itemID in ratingPair: 
     top_rated = nlargest(10, ratingTotal) 
    for top_rated in ratingTotal: 
     return (ratingTotal, itemID) 

def reducer_find_top10(self, key, ratingPair): 
    ratingTotal, itemID = self.top_10(ratingPair) 
    yield ratingTotal, itemID 


if __name__ == '__main__': 
    MostRatedProduct.run() 

回答

0

我沒有使用過mrjob,但我已經使用AWS集羣上的MapReduce之前找到頂部的值。這是我的代碼,它不使用heapq。希望您能夠將相同的概念應用於您的代碼。這裏是映射器功能

import sys, time 

def Parser(): 
    for line in sys.stdin: 
     line = line.strip('\n') 
     yield line.split() 


def mapper(): 
    counts = list(Parser()) 
    z = sorted(counts, key = lambda x: int(x[1]))[-10:] 
    print '\n'.join(map(lambda x: '\t'.join(x), z)) 


if __name__=='__main__': 
    mapper() 

下面是減速器

import sys, operator, itertools 

def Parser(): 
    for line in sys.stdin: 
     yield tuple(line.strip('\n').split('\t')) 

def reducer(): 
    for key, pairs in itertools.groupby(Parser(), operator.itemgetter(0)): 
     counts = list(Parser()) 
     z = sorted(counts, key = lambda x: int(x[1]))[-10:] 
     print '\n'.join(map(lambda x: '\t'.join(x), z)) 

if __name__=='__main__': 
    reducer() 

我改成輸出的前10字的代碼。請記住,這是一個字數統計的例子,我解析了一個文本文檔。我希望這有助於某種方式!