2017-09-24 69 views
2

我有這個主...使用自定義組合器...它可能被忽略?

job.setMapperClass(AverageIntMapper.class); 
    job.setCombinerClass(AverageIntCombiner.class); 
    job.setReducerClass(AverageIntReducer.class); 

與組合有不同的代碼,但該組合被完全忽略的減速器使用輸出從映射器輸出。

我明白一個Combiner可能不會被使用,但我認爲這是Combiner和Reducer一樣的情況。我真的不明白能夠創建自定義組合器的意義,但系統仍然可以跳過它的使用。

如果不應該發生這種情況,可能是組合器沒有被使用的原因是什麼?

代碼...

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class AverageInt { 

public static class AverageIntMapper extends Mapper<LongWritable, Text, Text, Text> { 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

     String n_string = value.toString(); 
     context.write(new Text("Value"), new Text(n_string)); 
    } 
} 

public static class AverageIntCombiner extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 

     int sum = 0; 
     int count = 0; 

     for(IntWritable value : values) { 
      int temp = Integer.parseInt(value.toString()); 
      sum += value.get(); 
      count += 1; 
     } 

     String sum_count = Integer.toString(sum) + "," + Integer.toString(count); 

     context.write(key, new Text(sum_count)); 
    } 
} 

public static class AverageIntReducer extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 

     int total = 0; 
     int count = 0; 

     for(Text value : values) { 
      String temp = value.toString(); 
      String[] split = temp.split(","); 
      total += Integer.parseInt(split[0]); 
      count += Integer.parseInt(split[1]); 
     } 

     Double average = (double)total/count; 

     context.write(key, new Text(average.toString())); 
    } 
} 

public static void main(String[] args) throws Exception { 

    if(args.length != 2) { 
     System.err.println("Usage: AverageInt <input path> <output path>"); 
     System.exit(-1); 
    } 

    Job job = new Job(); 
    job.setJarByClass(AverageInt.class); 
    job.setJobName("Average"); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(AverageIntMapper.class); 
    job.setCombinerClass(AverageIntCombiner.class); 
    job.setReducerClass(AverageIntReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

你怎麼知道它被忽略?組合器沒有數量嗎? –

+0

@BinaryNerd在Combiner中創建的值有一個分隔符,我嘗試在Reducer中分割,但當我嘗試分割時,出現數組越界的錯誤。如果我刪除了在Reducer中分割的邏輯並輸出它作爲輸入獲取的值,那麼Mapper會輸出這些值。 – cpd1

+0

我會發布你的代碼,否則它不可能有人能幫助你。 –

回答

1

如果你看一下你的映射器發射:

public void map(LongWritable key, Text value, Context context)

它發送兩個Text對象,但同時你聲明組合類本身正確地說,減少方法有:

public void reduce(Text key, Iterable<IntWritable> values, Context context)

它應該是:

public void reduce(Text key, Iterable<Text> values, Context context)

+0

這看起來像解決了問題。我想我沒有注意到這個問題,因爲編譯/執行時沒有任何錯誤。 – cpd1

+0

這是一個容易犯的錯誤,hadoop將一直使用Reduce類中的基本實現,它只是將數據傳遞過去而不修改它。 –