2016-05-14 192 views
2

我有以下的輸入大的TSV文件:MapReduce的減速機的輸出錯誤

Site1 Tag1 
Site1 Tag34 
Site1 Tag8 
Site2 Tag75 
Site2 Tag54 
Site2 Tag8 
Site3 Tag24 
Site3 Tag34 
Site3 Tag1 
... 

我想找到,用Hadoop MapReduce的,在輸入和類似標籤的數量在所有這些站點之間的類似網站對幫助每一雙。

輸出的部分輸入提出:

Site1 Site2 1 // Site1 is similar to Site2 with 1 tag (Tag8) 
Site1 Site3 2 // Site1 is similar to Site3 with 2 tag (Tag1 and Tag34) 
Site2 Site1 1 
Site3 Site1 2 

我要輸出每每個站點只有10個最相似的網站。

每個站點有3個標籤

我想用2個MapReduce作業:

  1. 要映射標籤(密鑰)和網站,並通過標籤減少,在減少階段採取的所有站點特定的標籤並寫出輸出'標籤SiteX SiteY'
  2. 第二個MapReduce作業將接受第一個輸入並執行GROUP BY SiteX,SiteY對以獲得一對類似網站中類似標籤的數量。

我試圖實現第一個MAPRED,但我得到的只是「標記,站點」輸出。

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class RawToSimilarTagMapper { 

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> { 

     private Text site = new Text(); 
     private Text tag  = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String [] siteTag = value.toString().split("\t"); 
      site.set(siteTag[0]); 
      tag.set(siteTag[1]); 

      context.write(tag, site); 
      System.out.println(); 
     } 
    } 

    public static class SimilarSiteReducer extends Reducer<Text, Text, Text, Text> { 
     private Text value = new Text(); 

     public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException { 
      for (Text text : values) { 
       for (Text text2 : values) { 
        if (!text.equals(text2)) { 
         value.set(text.toString() + "\t" + text2.toString()); 
         output.collect(key, value); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "raw-to-similar"); 
     job.setJarByClass(RawToSimilarTagMapper.class); 
     job.setMapperClass(TagToSiteMapper.class); 
     job.setCombinerClass(SimilarSiteReducer.class); 
     job.setReducerClass(SimilarSiteReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     FileInputFormat.addInputPath(job, new Path(args[1])); 
     FileOutputFormat.setOutputPath(job, new Path(args[2])); 
     FileSystem fs = null; 
     Path dstFilePath = new Path(args[2]); 
     try { 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 
     } catch (IOException e1) { 
      e1.printStackTrace(); 
     } 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

我在做什麼錯在這裏?

同樣對於下一個階段,我怎麼能得到每個網站只有前10名最相似的網站?

回答

1

這就是我該怎麼做的。此外,您可以通過在第二份工作的輸出中編寫第三份工作來進行排序,以獲得排名前十的站點(提示:您只需要編寫映射器)注意:這適用於提供問題的示例數據。您可能需要首先清理格式不正確的數據。

最終輸出:

Site2 2 
Site2 Site1 1 
Site3 1 
Site3 Site1 2 

代碼:

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

public class TopSites{ 

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> { 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String [] siteTag = value.toString().split("\t"); 
      context.write(new Text(siteTag[1]), new Text(siteTag[0])); 
      System.out.println(siteTag[1] + " --> " + siteTag[0]); 
     } 
    } 


    public static class TagToSiteReducer extends Reducer<Text, Text, Text, Text> { 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 
      String l = ""; 
      System.out.print("Key: [" + key.toString() + "] Values: ["); 

      for (Text site : values) 
       l += site + "\t"; 

      l=l.substring(0, l.length()-1); 
      System.out.println(l + "]"); 
      context.write(new Text(key), new Text(l)); 
     } 
    } 
    public static class TopSiteMapper extends Mapper<Object, Text, Text, IntWritable> { 
     private final static IntWritable one = new IntWritable(1); 
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 

      String [] data = value.toString().split("\t"); 
      String sites =""; 
      System.out.println("map received: "+ value.toString()); 

      for(int i=1;i<data.length;i++) 
       sites += data[i] + "\t";  

      System.out.println(sites.substring(0,sites.length()-1)); 
      context.write(new Text(sites.substring(0,sites.length()-1)), one); 
     } 
    } 

    public static class TopSiteReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
       throws IOException, InterruptedException { 
      int sum = 0; 
      System.out.print("Key: [" + key.toString() + "] Values: ["); 

      for (IntWritable site : values){ 
       System.out.print(site.get()); 
       sum+=site.get(); 
      } 
      System.out.println("]"); 
      context.write(new Text(key), new IntWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 

     Job job = Job.getInstance(conf, "site-to-tag"); 

     job.setJarByClass(TopSites.class); 
     job.setMapperClass(TagToSiteMapper.class); 
     job.setReducerClass(TagToSiteReducer.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, TagToSiteMapper.class); 

     Path outputpath = new Path(args[1]+"_temp"); 
     FileOutputFormat.setOutputPath(job,outputpath); 

     FileSystem fs = null; 
     Path dstFilePath = new Path(args[1]); 
     try { 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 

      dstFilePath = new Path(args[1]+"_temp"); 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 
     } catch (IOException e1) { 
      e1.printStackTrace(); 
     } 

     int code = job.waitForCompletion(true)?0:1; 
     if(code == 0) 
     { 
      Job SecondJob = Job.getInstance(conf, "Tag-to-Sites"); 
      SecondJob.setJarByClass(TopSites.class); 

      SecondJob.setOutputKeyClass(Text.class); 
      SecondJob.setOutputValueClass(IntWritable.class); 

      SecondJob.setMapperClass(TopSiteMapper.class); 
      SecondJob.setCombinerClass(TopSiteReducer.class); 
      SecondJob.setReducerClass(TopSiteReducer.class); 


      FileInputFormat.addInputPath(SecondJob,new Path(args[1]+ "_temp")); 
      FileOutputFormat.setOutputPath(SecondJob,new Path(args[1])); 
      int exitCode = SecondJob.waitForCompletion(true)?0:1; 
      FileSystem.get(conf).delete(new Path(args[1]+"_temp"), true); 
      System.exit(exitCode); 
     } 
    } 
} 

控制檯STD輸出:

Tag1 --> Site1 
Tag34 --> Site1 
Tag8 --> Site1 
Tag75 --> Site2 
Tag54 --> Site2 
Tag8 --> Site2 
Tag24 --> Site3 
Tag34 --> Site3 
Tag1 --> Site3 
Key: [Tag1] Values: [Site3 Site1] 
Key: [Tag24] Values: [Site3] 
Key: [Tag34] Values: [Site3 Site1] 
Key: [Tag54] Values: [Site2] 
Key: [Tag75] Values: [Site2] 
Key: [Tag8] Values: [Site2 Site1] 
map received: Tag1 Site3 Site1 
Site3 Site1 
map received: Tag24 Site3 
Site3 
map received: Tag34 Site3 Site1 
Site3 Site1 
map received: Tag54 Site2 
Site2 
map received: Tag75 Site2 
Site2 
map received: Tag8 Site2 Site1 
Site2 Site1 
Key: [Site2] Values: [11] 
Key: [Site2 Site1] Values: [1] 
Key: [Site3] Values: [1] 
Key: [Site3 Site1] Values: [11] 
Key: [Site2] Values: [2] 
Key: [Site2 Site1] Values: [1] 
Key: [Site3] Values: [1] 
Key: [Site3 Site1] Values: [2] 
+0

對於第二/第三工作你也可以用豬腳本 –

0

看起來你的組合器在這裏出現問題。映射器&組合器的輸出格式必須相同,在您的情況下不適用。你可以註釋掉combiner作爲它只用於性能優化,並運行相同。