2014-12-08 84 views
5

我正在使用hadoop map reduce,我想計算兩個文件。我的第一個Map/Reduce的迭代是給我的一個文件,對ID號碼這樣的:Hadoop多輸入

A 30 
D 20 

我的目標是使用ID從文件到另一個文件相關聯,並與三重奏另一個輸出: ID,號碼,名稱,像這樣:

A ABC 30 
D EFGH 20 

但我不確定使用Map Reduce是否是最好的方法來做到這一點。例如使用文件讀取器讀取第二個輸入文件並通過ID獲取名稱會更好嗎?或者我可以用Map Reduce做到嗎?

如果是這樣,我試圖找出如何。我嘗試了多輸入解決方案:

MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"), 
    TextInputFormat.class, FlightsByCarrierMapper2.class); 
MultipleInputs.addInputPath(job2, new Path("inputplanes"), 
    TextInputFormat.class, FlightsModeMapper.class); 

但我想不出任何解決方案將兩者結合起來,並得到了我想要的輸出。我現在的辦法就是給我的名單像這樣的例子:

A ABC 
A 30 
B ABCD 
C ABCDEF 
D EFGH 
D 20 

我去年減少後我得到這個:

N125DL 767-332 
N125DL 7 , 
N126AT 737-76N 
N126AT 19 , 
N126DL 767-332 
N126DL 1 , 
N127DL 767-332 
N127DL 7 , 
N128DL 767-332 
N128DL 3 

我想這一點:N127DL 7 767-332。而且,我不想要那些不合並的。

這是我減少類:

公共類FlightsByCarrierReducer2擴展減速{

String merge = ""; 
protected void reduce(Text token, Iterable<Text> values, Context context) 
          throws IOException, InterruptedException { 

    int i = 0; 
    for(Text value:values) 
    { 
     if(i == 0){ 
      merge = value.toString()+","; 
     } 
     else{ 
      merge += value.toString(); 
     } 
     i++; 
    } 

     context.write(token, new Text(merge)); 

} 

}

更新:

http://stat-computing.org/dataexpo/2009/the-data.html這是我使用的例子。

我試着:TailNum和Canceled是(1或0)獲得對應於TailNum的模型名稱。我的模型文件有一個TailNumb,模型和其他東西。我的電流輸出爲:

N193JB ERJ 190-100 IGW

N194DN 767-332

N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767-332

N195DN 2

先來關鍵,秒該模型,有航班取消鍵,apperas模型

下面我想三人重點,已取消型號,因爲我每個模型

+0

這兩個輸入文件的預期大小是多少? – blackSmith 2014-12-08 08:47:19

+0

第一個大約60萬個條目,第二個大約2k – dex90 2014-12-08 11:18:38

+1

說第二個文件的行長度平均爲100個字節,那麼總大小將大約爲200k。我猜你可以把它放在'DistributedCache'中執行地圖端加入並節省一些燃料;-) – blackSmith 2014-12-08 12:10:45

回答

1

可以使用ID作爲加入他們想取消的數量兩個mapper的關鍵。 你可以寫你的地圖的任務,因爲這樣的事情

public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException 
{ 
    //Get the line 
    //split the line to get ID seperate 
    //word1 = A 
    //word2 = 30 
       //Likewise for A ABC 
        //word1 = A 
        //word2 = ABC 
    context.write(word1, word2); 
} 

我想你可以resuse同一地圖的任務。 然後編寫一個通用的Reducer作業,其中Hadoop Framework在關鍵基礎上對數據進行分組。 所以你將能夠獲得ID作爲關鍵。 你可以緩存一個值然後concat。

String merge = ""; 
public void reduce(Text key, Iterable<Text> values, Context context) 
{ 
    int i =0; 
    for(Text value:values) 
    { 
     if(i == 0){ 
      merge = value.toString()+","; 
     } 
     else{ 
      merge += value.toString(); 
     } 
     i++; 
    } 
    valEmit.set(merge); 
    context.write(key, valEmit); 
} 

最後,你可以寫你的驅動程序類

public int run(String[] args) throws Exception { 
Configuration c=new Configuration(); 
String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); 
Path p1=new Path(files[0]); 
Path p2=new Path(files[1]); 
Path p3=new Path(files[2]); 
FileSystem fs = FileSystem.get(c); 
if(fs.exists(p3)){ 
    fs.delete(p3, true); 
    } 
Job job = new Job(c,"Multiple Job"); 
job.setJarByClass(MultipleFiles.class); 
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); 
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); 
job.setReducerClass(MultipleReducer.class); 
. 
. 
} 

您可以找到實例HERE

希望這有助於。


UPDATE

輸入1

A 30 
D 20 

輸入2

A ABC 
D EFGH 

缺貨把

A ABC 30 
D EFGH 20 

Mapper.java

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

/** 
* @author sreeveni 
* 
*/ 
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { 
    Text keyEmit = new Text(); 
    Text valEmit = new Text(); 

    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     String line = value.toString(); 
     String parts[] = line.split(" "); 
     keyEmit.set(parts[0]); 
     valEmit.set(parts[1]); 
     context.write(keyEmit, valEmit); 
    } 
} 

Reducer.java

import java.io.IOException; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

/** 
* @author sreeveni 
* 
*/ 
public class ReducerJoin extends Reducer<Text, Text, Text, Text> { 

    Text valEmit = new Text(); 
    String merge = ""; 

    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     String character = ""; 
     String number = ""; 
     for (Text value : values) { 
      // ordering output 
      String val = value.toString(); 
      char myChar = val.charAt(0); 

      if (Character.isDigit(myChar)) { 
       number = val; 
      } else { 
       character = val; 
      } 
     } 
     merge = character + " " + number; 
     valEmit.set(merge); 
     context.write(key, valEmit); 
    } 

} 

Driver類

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

/** 
* @author sreeveni 
* 
*/ 
public class Driver extends Configured implements Tool { 
    public static void main(String[] args) throws Exception { 
     // TODO Auto-generated method stub 
     // checking the arguments count 

     if (args.length != 3) { 
      System.err 
        .println("Usage : <inputlocation> <inputlocation> <outputlocation> "); 
      System.exit(0); 
     } 
     int res = ToolRunner.run(new Configuration(), new Driver(), args); 
     System.exit(res); 

    } 

    @Override 
    public int run(String[] args) throws Exception { 
     // TODO Auto-generated method stub 
     String source1 = args[0]; 
     String source2 = args[1]; 
     String dest = args[2]; 
     Configuration conf = new Configuration(); 
     conf.set("mapred.textoutputformat.separator", " "); // changing default 
                  // delimiter to user 
                  // input delimiter 
     FileSystem fs = FileSystem.get(conf); 
     Job job = new Job(conf, "Multiple Jobs"); 

     job.setJarByClass(Driver.class); 
     Path p1 = new Path(source1); 
     Path p2 = new Path(source2); 
     Path out = new Path(dest); 
     MultipleInputs.addInputPath(job, p1, TextInputFormat.class, 
       Mapper1.class); 
     MultipleInputs.addInputPath(job, p2, TextInputFormat.class, 
       Mapper1.class); 
     job.setReducerClass(ReducerJoin.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     job.setOutputFormatClass(TextOutputFormat.class); 

     /* 
     * delete if exist 
     */ 
     if (fs.exists(out)) 
      fs.delete(out, true); 

     TextOutputFormat.setOutputPath(job, out); 
     boolean success = job.waitForCompletion(true); 

     return success ? 0 : 1; 
    } 

} 
+0

我認爲我更接近,但我沒有得到正確的輸出,我不知道爲什麼,只是更新了問題 – dex90 2014-12-08 19:39:11

0

你的reducer有一個map方法,但它應該有一個reduce方法,它接受一個你可以合併的Iterable值集合。由於您沒有reduce()方法,因此您將獲得僅傳遞所有鍵/值對的默認行爲。

+0

該方法的名稱是錯了......我已經注意到了這個錯誤。但它沒有什麼區別。無論如何,我已經嘗試過Iterable Collection,但它沒有奏效。無論如何,我會發布我目前的Reducer。 – dex90 2014-12-11 15:05:37

+0

將@Override標誌添加到該方法以強制編譯器確保您已正確覆蓋它。 – 2014-12-11 17:18:01