我正在嘗試編寫一個Hadoop map/reduce類,該類讀取包含演員列表和他們演過的電影(每行一個影片)的文本文件,並返回每個演員參與的電影數量。按頻率對Hadoop結果(類似於字數)進行排序
最後,我希望結果按電影數量排序(可以是升序或降序)。但是,我的代碼似乎按照電影標題中的字符數來排序結果。我嘗試了所有我能想到的方法,包括將輸出(Text,IntWritable轉換爲IntWritable,Text)和使用不同的比較器,但我無法通過影片計數對結果進行排序。
我敢肯定這是非常簡單的事情,但我無法想象出我的生活。任何建議將不勝感激。
從數據文件的摘錄:
Chan, Jackie (I) The Forbidden Kingdom 2008
Chan, Jackie (I) Kung Fu Panda 2 2011
Chan, Jackie (I) Shanghai Noon 2000
Chan, Jackie (I) Pik lik for 1995
Chan, Jackie (I) The Karate Kid 2010
Chan, Jackie (I) Shanghai Knights 2003
Chan, Jackie (I) Around the World in 80 Days 2004
Chan, Jackie (I) Rush Hour 1998
Chan, Jackie (I) The Tuxedo 2002
Chan, Jackie (I) Kung Fu Panda 2008
Chan, Jackie (I) Rush Hour 2 2001
Chan, Jackie (I) Rush Hour 3 2007
Davi, Robert Licence to Kill 1989
Davi, Robert Die Hard 1988
Davi, Robert The Hot Chick 2002
Davi, Robert The Goonies 1985
我的代碼如下:
// MovieCountByActor.java
package ucsc.hadoop.homework2;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import ucsc.hadoop.util.ConfigurationUtil;
public class MovieCountByActor extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(MovieCountByActor.class);
public int run(String[] args) throws Exception {
// Configuration conf = getConf();
JobConf conf = new JobConf(getConf(), MovieCountByActor.class);
conf.setOutputKeyComparatorClass(CountSort.class);
conf.setOutputValueGroupingComparator(CountSort.class);
if (args.length != 2) {
System.err.println("Usage: moviecountbyactor <in> <out>");
System.exit(2);
}
ConfigurationUtil.dumpConfigurations(conf, System.out);
LOG.info("input: " + args[0] + " output: " + args[1]);
Job job = new Job(conf, "movie count");
job.setJarByClass(MovieCountByActor.class);
job.setMapperClass(MovieTokenizerMapper.class);
job.setReducerClass(MovieCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setSortComparatorClass(CountSort.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
return (result) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MovieCountByActor(), args);
System.exit(exitCode);
}
public static class MovieTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private final static Text ACTOR = new Text();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\t");
String actor = "";
if (tokens.length == 3) {
actor = tokens[0];
ACTOR.set(actor);
context.write(ACTOR, ONE);
}
}
}
public static class MovieCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int movieCountPerActor = 0;
for (IntWritable count : values) {
movieCountPerActor += count.get();
}
result.set(movieCountPerActor);
context.write(actor, result);
}
}
public static class CountSort extends WritableComparator {
protected CountSort() {
super (IntWritable.class);
}
@Override
public int compare(byte[] b1, int j1, int k1, byte[] b2, int j2, int k2) {
Integer a = ByteBuffer.wrap(b1, j1, k1).getInt();
Integer b = ByteBuffer.wrap(b2, j2, k2).getInt();
return a.compareTo(b) * -1;
}
}
}
要麼使用一個單一的減速,或者如果數據是巨大的使用自定義的範圍分區與多個減速。 – 2013-03-18 01:35:10
@ chris-white,如何依次使用多個M/R的任何示例?我也對聚合頻繁的值感興趣。 – feder 2013-11-26 18:04:48
@feder - 最好用更多的細節發表一個新問題 – 2013-11-26 18:13:16