2013-02-25 50 views
0

我的一個mapper會生成一些分佈在文件中的日誌,如part-0,part-1,part-2等。現在,每個日誌都有一些查詢和一些關聯的數據:Map Reduce針對常見查詢聚合的簡單分數

part-0 

q    score   
1 ben 10  4.01 
horse shoe 5.96 
... 

part-1 

1 ben 10  3.23 
horse shoe  2.98 
.... 

and so on for part-2,3 etc. 

現在同樣的查詢q即「1本10」上方駐留在部分1,部分2等

現在我必須寫一個地圖減少相位其中我可以收集相同查詢和彙總(加起來)他們的分數。

我的映射器函數可以是一個身份,並在減少我將完成此任務。

輸出將是:

q  aggScore 
1 ben 10 7.24 
horse shoe 8.96 
... 

似乎是一個簡單的任務,但我不能想到的,我怎麼可以這樣進行(閱讀了很多,但不是真的能夠進行)。我可以考慮通用算法問題,其中首先我將收集常見查詢並將其分數相加。

任何幫助pythonic解決方案或算法(地圖減少)的一些提示將非常感激。

+0

[你嘗試過什麼?](http://whathaveyoutried.com)什麼方法是用來從文件讀取,存儲數據,操作它,然後將其顯示給用戶? – 2013-02-25 08:07:56

+0

我正在使用Hadoop Streaming。輸入來自標準輸入和標準輸出。 – Dominix 2013-02-25 08:10:11

回答

1

這裏是MapReduce的溶液:

地圖輸入:每個輸入文件(部分0,部分1,部分2,...)可以被輸入到各個(分離)映射任務。

foreach在輸入文件中的輸入行, 映射器發出<q,aggScore>。如果單個文件中的查詢有多個分數,則Map會將它們總和,否則,如果我們知道每個查詢只會在每個文件中出現一次,則映射可以是每個輸入行按原樣發出<q,aggScore>的標識函數。

減速機輸入形式爲<q,list<aggScore1,aggScore2,...>減速機操作類似於着名的MapReduce示例wordcount。如果您正在使用Hadoop,則可以對Reducer使用以下方法。

public void reduce(Text q, Iterable<IntWritable> aggScore, Context context) throws IOException, InterruptedException { 
    int sum = 0; 
    for (IntWritable val : aggScore) { 
     sum += val.get(); 
    } 
    context.write(q, new IntWritable(sum)); 
} 

的方法將總結所有aggScores特定q,給你所需的輸出。對於減速的Python代碼應該是這個樣子(這裏q是關鍵和aggScores列表是值):

def reduce(self, key, values, output, reporter): 
    sum = 0 
    while values.hasNext(): 
     sum += values.next().get() 
    output.collect(key, IntWritable(sum))