2016-04-29 218 views
0

我的項目是顯示最高,最低和平均溫度。我已經完成了,但我必須使用按鍵組來顯示此功能。在我的應用程序中有4個單選按鈕用於年,月,日和城市。如果我選擇一個,那麼它會要求我輸入聚合函數(max,min,avg)。對於這些我需要更改我的CompositeGroupKey班,但我對此沒有任何想法。所以請幫助我,並提供有關需要使用代碼完成更改的輸入。我想顯示最高,最低和平均溫度使用hadoop

司機:

import org.apache.hadoop.io.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class MaxTemperature 
{    
      public static void Main (String[] args) throws Exception 
      { 
         if (args.length != 2) 
         { 
           System.err.println("Please Enter the input and output parameters"); 
           System.exit(-1); 
         } 

         Job job = new Job(); 
         job.setJarByClass(MaxTemperature.class); 
         job.setJobName("Max temperature"); 

         FileInputFormat.addInputPath(job,new Path(args[0])); 
         FileOutputFormat.setOutputPath(job,new Path (args[1])); 

         job.setMapperClass(MaxTemperatureMapper.class); 
         job.setReducerClass(MaxTemperatureReducer.class); 

         job.setMapOutputKeyClass(CompositeGroupKey.class); 
         job.setMapOutputValueClass(IntWritable.class); 

         job.setOutputKeyClass(CompositeGroupKey.class); 
         job.setOutputValueClass(DoubleWritable.class); 


         System.exit(job.waitForCompletion(true)?0:1); 
      } 
} 

映射器:

import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import java.io.IOException; 

public class MaxTemperatureMapper extends Mapper <LongWritable, Text, CompositeGroupKey, IntWritable> 
{ 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    { 
    String line = value.toString(); 
    int year = Integer.parseInt(line.substring(0,4)); 
    String mnth = line.substring(7,10); 
    int date = Integer.parseInt(line.substring(10,12)); 
    int temp= Integer.parseInt(line.substring(12,14)); 

    CompositeGroupKey cntry = new CompositeGroupKey(year,mnth, date); 


    context.write(cntry, new IntWritable(temp)); 
      } 
} 

減速機:

import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.*; 
import java.io.IOException; 


public class MaxTemperatureReducer extends Reducer <CompositeGroupKey, IntWritable, CompositeGroupKey, CompositeGroupkeyall >{ 


    public void reduce(CompositeGroupKey key, Iterable<IntWritable> values , Context context) throws IOException,InterruptedException 
      { 


      Double max = Double.MIN_VALUE; 
      Double min =Double.MAX_VALUE; 

      for (IntWritable value : values ) 
      {    
       min = Math.min(min, value.get()); 
       max = Math.max(max, value.get()); 

      } 

      CompositeGroupkeyall val =new CompositeGroupkeyall(max,min); 
      context.write(key, val); 
      } 
} 

和複合鍵:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableUtils; 

class CompositeGroupKey implements WritableComparable<CompositeGroupKey> { 
    int year; 
    String mnth; 
    int date; 
    CompositeGroupKey(int y, String c, int d){ 
     year = y; 
     mnth = c; 
     date = d; 
    } 
    CompositeGroupKey(){} 

    public void write(DataOutput out) throws IOException { 
     out.writeInt(year); 
     WritableUtils.writeString(out, mnth); 
     out.writeInt(date); 
     } 
    public void readFields(DataInput in) throws IOException { 
     this.year = in.readInt(); 
     this.mnth = WritableUtils.readString(in); 
     this.date = in.readInt(); 
     } 
    public int compareTo(CompositeGroupKey pop) { 
     if (pop == null) 
      return 0; 
     int intcnt; 
     intcnt = Integer.valueOf(year).toString().compareTo(Integer.valueOf(pop.year).toString()); 
     if(intcnt != 0){ 
      return intcnt; 
     }else if(mnth.compareTo(pop.mnth) != 0){ 
      return mnth.compareTo(pop.mnth); 
     }else{ 
      return Integer.valueOf(date).toString().compareTo(Integer.valueOf(pop.date).toString()); 
     } 
    } 
    public String toString() { 
     return year + " :" + mnth.toString() + " :" + date; 
     } 
} 









import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 
class CompositeGroupkeyall implements WritableComparable<CompositeGroupkeyall> { 
    Double max; 
    Double min; 

    CompositeGroupkeyall(double x, double y){ 
     max = x ; 
     min = y ; 
    } 
    CompositeGroupkeyall(){} 

    public void readFields(DataInput in) throws IOException { 
     this.max = in.readDouble(); 
     this.min = in.readDouble(); 
    } 

    public void write(DataOutput out) throws IOException { 
     out.writeDouble(max); 

     out.writeDouble(min); 


     } 

    public int compareTo(CompositeGroupkeyall arg0) { 
     return -1; 
    } 

    public String toString() { 
     return max + " " + min +" " ; 
     } 
} 

回答

0

您可以按照以下方式創建更多的鍵值對,並讓相同的減速器處理數據,所有日期/月/年將由同一個減速器處理

CompositeGroupKey cntry = new CompositeGroupKey(year, mnth, date); 
CompositeGroupKey cntry_date = new CompositeGroupKey((int)0, "ALL", date); 
CompositeGroupKey cntry_mnth = new CompositeGroupKey((int)0, mnth, (int) 1); 
CompositeGroupKey cntry_year = new CompositeGroupKey(year, "ALL", (int) 1); 

context.write(cntry, new IntWritable(temp)); 
context.write(cntry_date, new IntWritable(temp)); 
context.write(cntry_mnth, new IntWritable(temp)); 
context.write(cntry_year, new IntWritable(temp)); 
相關問題