2013-02-24 158 views
1

我需要使用所有reduce任務的結果執行聚合。基本上減少任務找到總數和計數和一個值。我需要添加所有的總和和數量,並找到最終的平均值。在master和reduce之間共享數據

我試着用conf.setInt中的reduce。但是,當我試圖從main函數訪問失敗

class Main { 

public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> { 

    public void reduce(Text key, Iterable<Text> values, 
      Context context 
      ) throws IOException, InterruptedException { 
     int i = 0; 
     int fd = 0, fc = 0; 
     fd = context.getConfiguration().getInt("fd", -1); 
     fc = context.getConfiguration().getInt("fc", -1); 
     //when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it. 
    } 
} 

public static void main(String[] args) throws Exception{ 
    Configuration conf = new Configuration(); 
    conf.setInt("fc", 5); 

    Job job = new Job(conf, "Flight Data"); 
    job.setJarByClass(FlightData.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(MyReducer.class); 

    job.setPartitionerClass(FirstPartitioner.class); 
    job.setGroupingComparatorClass(GroupComparator.class); 
    job.setSortComparatorClass(KeyComparator.class); 


    job.setNumReduceTasks(10); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 


    flightCount = job.getConfiguration().getInt("fc", -1); 
    flightDelay = job.getConfiguration().getInt("fd", -1); 
    //here when I access fc, fd, I get back 5 & 5 
    System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount); 
} 
+0

你得到什麼錯誤?你也可以添加你使用的語言作爲標籤嗎? – 2013-02-24 02:43:00

回答

0

覆蓋使用新的API org.apache.hadoop.mapreduce映射器和減速的run()。在這些方法中,您可以從每個映射器或減速器發出累計總和/計數。

此外,您需要限制減數計數1以獲得由多個映射器生成的所有總和的全局總和。

請參見下面的代碼更加清晰:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class AggregationExample extends Configured implements Tool { 

    /** 
    * This is Mapper. 
    * 
    */ 
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> { 

     private Text outputKey = new Text(); 
     private Text outputValue = new Text(); 
     private double sum; 

     @Override 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

      try { 
       // say that you need to sum up the value part 
       sum+= Double.valueOf(value); 
     } 

     @Override 
     public void run(Context context) throws IOException, InterruptedException { 

      setup(context); 
      while (context.nextKeyValue()) { 
       map(context.getCurrentKey(), context.getCurrentValue(), context); 
      } 

      // emit out the sum per mapper 
      outputKey.set(sum); 
      context.write(outputKey, outputValue);// Notice that the outputValue is empty 
      cleanup(context); 

     } 
    } 

    /** 
    * This is Reducer. 
    * 
    */ 
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> { 

     private Text outputKey = new Text(); 
     private Text outputValue = new Text(); 
     private double sum; 

     @Override 
     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
       InterruptedException { 


      // summation of values from each mapper 
      sum += Double.valueOf(key.toString()); 

     } 

     @Override 
     public void run(Context context) throws IOException, InterruptedException { 

      setup(context); 
      while (context.nextKey()) { 
       reduce(context.getCurrentKey(), context.getValues(), context); 
      } 

      // emit out the global sums 
      outputKey.set(sum); 
      context.write(outputKey, outputValue); 
      cleanup(context); 
     } 
    } 

    @Override 
    public int run(String[] args) throws Exception { 

     try { 
      Configuration conf = getConf(); 

      // output key and value separator is empty as in final output only 
      // key is emitted and value is empty 
      conf.set("mapred.textoutputformat.separator", ""); 

      // Configuring mapred to have just one reducer as we need to find 
      // single sum values from all the inputs 
      conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); 
      conf.setInt("mapred.reduce.tasks", 1); 

      Job job = new Job(conf); 

      job.setJarByClass(AggregationExample.class); 
      job.setJobName("Aggregation Example"); 

      job.setMapperClass(MapJob.class); 
      job.setReducerClass(ReduceJob.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      job.setInputFormatClass(TextInputFormat.class); 
      job.setOutputFormatClass(TextOutputFormat.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(Text.class); 
      FileInputFormat.setInputPaths(job, args[0]); 
      FileOutputFormat.setOutputPath(job, new Path(args[1])); 

      boolean success = job.waitForCompletion(true); 
      return success ? 0 : 1; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 

     if (args.length < 2) { 
      System.out 
        .println("Usage: AggregationExample <comma sparated list of input directories> <output dir>"); 
      System.exit(-1); 
     } 

     int result = ToolRunner.run(new AggregationExample(), args); 
     System.exit(result); 
    } 

} 

你很可能映射這種方法,您的問題。

0

找到解決方案。我使用的計數器

http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/

公共類FlightData {

//enum for counters used by reducers 
public static enum FlightCounters { 
    FLIGHT_COUNT, 
    FLIGHT_DELAY; 
} 
public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> { 

    public void reduce(Text key, Iterable<Text> values, 
      Context context 
      ) throws IOException, InterruptedException { 


     delay1 = Float.parseFloat(origin[5]); 
     delay2 = Float.parseFloat(dest[5]); 
     context.getCounter(FlightCounters.FLIGHT_COUNT).increment(1); 
     context.getCounter(FlightCounters.FLIGHT_DELAY) 
     .increment((long) (delay1 + delay2)); 

    } 
} 
public static void main(String[] args) throws Exception{ 
    float flightCount, flightDelay; 
    job.waitForCompletion(true); 
    //get the final results updated in counter by all map and reduce tasks 
    flightCount = job.getCounters() 
      .findCounter(FlightCounters.FLIGHT_COUNT).getValue(); 
    flightDelay = job.getCounters() 
      .findCounter(FlightCounters.FLIGHT_DELAY).getValue(); 
} 

}