2013-03-17 83 views
0

我正在嘗試編寫一個Hadoop map/reduce類,該類讀取包含演員列表和他們演過的電影(每行一個影片)的文本文件,並返回每個演員參與的電影數量。按頻率對Hadoop結果(類似於字數)進行排序

最後,我希望結果按電影數量排序(可以是升序或降序)。但是,我的代碼似乎按照電影標題中的字符數來排序結果。我嘗試了所有我能想到的方法,包括將輸出(Text,IntWritable轉換爲IntWritable,Text)和使用不同的比較器,但我無法通過影片計數對結果進行排序。

我敢肯定這是非常簡單的事情,但我無法想象出我的生活。任何建議將不勝感激。

從數據文件的摘錄:

Chan, Jackie (I) The Forbidden Kingdom 2008 
Chan, Jackie (I) Kung Fu Panda 2 2011 
Chan, Jackie (I) Shanghai Noon 2000 
Chan, Jackie (I) Pik lik for 1995 
Chan, Jackie (I) The Karate Kid 2010 
Chan, Jackie (I) Shanghai Knights 2003 
Chan, Jackie (I) Around the World in 80 Days 2004 
Chan, Jackie (I) Rush Hour 1998 
Chan, Jackie (I) The Tuxedo 2002 
Chan, Jackie (I) Kung Fu Panda 2008 
Chan, Jackie (I) Rush Hour 2 2001 
Chan, Jackie (I) Rush Hour 3 2007 
Davi, Robert Licence to Kill 1989 
Davi, Robert Die Hard 1988 
Davi, Robert The Hot Chick 2002 
Davi, Robert The Goonies 1985 

我的代碼如下:

// MovieCountByActor.java 

package ucsc.hadoop.homework2; 

import java.io.IOException; 
import java.nio.ByteBuffer; 

import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
// import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

import ucsc.hadoop.util.ConfigurationUtil; 

public class MovieCountByActor extends Configured implements Tool { 
private static final Log LOG = LogFactory.getLog(MovieCountByActor.class); 

    public int run(String[] args) throws Exception { 
     // Configuration conf = getConf(); 
     JobConf conf = new JobConf(getConf(), MovieCountByActor.class); 
     conf.setOutputKeyComparatorClass(CountSort.class); 
     conf.setOutputValueGroupingComparator(CountSort.class); 

     if (args.length != 2) { 
      System.err.println("Usage: moviecountbyactor <in> <out>"); 
      System.exit(2); 
     } 

     ConfigurationUtil.dumpConfigurations(conf, System.out); 

     LOG.info("input: " + args[0] + " output: " + args[1]); 

     Job job = new Job(conf, "movie count"); 
     job.setJarByClass(MovieCountByActor.class); 
     job.setMapperClass(MovieTokenizerMapper.class); 
     job.setReducerClass(MovieCountReducer.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 
     job.setSortComparatorClass(CountSort.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     boolean result = job.waitForCompletion(true); 
     return (result) ? 0 : 1; 
    } 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new MovieCountByActor(), args); 
     System.exit(exitCode); 
    } 

    public static class MovieTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 
     private final static IntWritable ONE = new IntWritable(1); 
     private final static Text ACTOR = new Text(); 

     @Override 
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String[] tokens = value.toString().split("\\t"); 

      String actor = ""; 
      if (tokens.length == 3) { 
       actor = tokens[0]; 
       ACTOR.set(actor); 
       context.write(ACTOR, ONE); 
      } 
     } 
    } 

    public static class MovieCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
     private IntWritable result = new IntWritable(); 

     public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 

      int movieCountPerActor = 0; 
      for (IntWritable count : values) { 
       movieCountPerActor += count.get(); 
      } 
      result.set(movieCountPerActor); 
      context.write(actor, result); 
     } 
    } 

    public static class CountSort extends WritableComparator { 
     protected CountSort() { 
      super (IntWritable.class); 
     } 

     @Override 
     public int compare(byte[] b1, int j1, int k1, byte[] b2, int j2, int k2) { 
      Integer a = ByteBuffer.wrap(b1, j1, k1).getInt(); 
      Integer b = ByteBuffer.wrap(b2, j2, k2).getInt(); 
      return a.compareTo(b) * -1; 
     } 
    } 

} 

回答

1

我想你混淆了什麼job.setSortComparatorClass(CountSort.class);在做什麼 - 這是你的關鍵值比較之前給他們減少。我認爲你只是在檢查序列化Text對象的Int部分(演員名稱),這就解釋了爲什麼你看到你的演員姓名長度的輸出(我想你會看到意想不到的輸出,如果你有兩個演員發生在同一個reduce實例上的相同名稱的長度

要按照電影的數量對輸出進行排序,您將需要執行另一個M/R作業來獲取第一個作業的輸出(電影按演員計數),然後使用映射器切換鍵/值(因此輸出鍵是計數,值是演員姓名)使用單個縮減器,您將按照電影的升序獲取演員計數

+1

要麼使用一個單一的減速,或者如果數據是巨大的使用自定義的範圍分區與多個減速。 – 2013-03-18 01:35:10

+0

@ chris-white,如何依次使用多個M/R的任何示例?我也對聚合頻繁的值感興趣。 – feder 2013-11-26 18:04:48

+2

@feder - 最好用更多的細節發表一個新問題 – 2013-11-26 18:13:16

0

默認情況下Map Reduce對reducer的輸出鍵進行排序,所以在計算電影后可以做些什麼f或特定演員,可以將縮減器的輸出鍵設置爲moviecount和value作爲演員的姓名。

如下:

public static class MovieCountReducer extends Reducer<Text, IntWritable, IntWritable,Text> { 
     private IntWritable result = new IntWritable(); 

     public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 

      int movieCountPerActor = 0; 
      for (IntWritable count : values) { 
       movieCountPerActor += count.get(); 
      } 
      result.set(movieCountPerActor); 
      context.write(result, actor); 
     } 
} 

另外,不要在作業配置以下更改:

job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(IntWritable.class); 
job.setOutputKeyClass(IntWritable.class); 
job.setOutputValueClass(Text.class);