2015-05-09 69 views
0

我有一個簡單的MapReduce作業,它應該從文本文件中讀取字典,並且它們逐行處理另一個大文件並計算逆文檔矩陣。輸出應該是這樣的:MapReduce - reducer發出一行輸出

word-id1 docX:tfX docY:tfY 
word-id2 docX:tfX docY:tfY etc... 

但是,減速器的輸出僅在一個huuuge線發射。我不明白爲什麼它應該爲每個word-id(這是reducer的關鍵)發出一個新行。

映射器會生成正確的輸出(對word-iddoc-id:tf的值在單獨的行上)。我測試了沒有減速器。減速器應該爲每個鍵在一行上追加對應於相同鍵的值。

請你看看我的代碼(特別是減速器和工作配置),並告訴我爲什麼減速器只發出一條巨大的線而不是與指定鍵對應的多條線?我花了好幾個小時來調試,無法繞過它。

public class Indexer extends Configured implements Tool { 

    /* 
    * Vocabulary: key = term, value = index 
    */ 
    private static Map<String, Integer> vocab = new HashMap<String, Integer>(); 

    public static void main(String[] arguments) throws Exception { 
     System.exit(ToolRunner.run(new Indexer(), arguments)); 
    } 

    public static class Comparator extends WritableComparator { 
     protected Comparator() { 
      super(Text.class, true); 
     } 

     @Override 
     public int compare(WritableComparable a, WritableComparable b) { 
      // Here we use exploit the implementation of compareTo(...) in 
      // Text.class. 
      return -a.compareTo(b); 
     } 
    } 

    public static class IndexerMapper extends 
      Mapper<Object, Text, IntWritable, Text> { 
     private Text result = new Text(); 

     // load vocab from distributed cache 
     public void setup(Context context) throws IOException { 
      Configuration conf = context.getConfiguration(); 
      FileSystem fs = FileSystem.get(conf); 
      URI[] cacheFiles = DistributedCache.getCacheFiles(conf); 
      Path getPath = new Path(cacheFiles[0].getPath()); 

      BufferedReader bf = new BufferedReader(new InputStreamReader(
        fs.open(getPath))); 
      String line = null; 
      while ((line = bf.readLine()) != null) { 
       StringTokenizer st = new StringTokenizer(line, " \t"); 

       int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id 
       String word = st.nextToken(); // second element is the term 

       // save vocab 
       vocab.put(word, index); 

      } 
     } 

     public void map(Object key, Text value, Context context) 
       throws IOException, InterruptedException { 

      // init TF map 
      Map<String, Integer> mapTF = new HashMap<String, Integer>(); 

      // parse input string 
      StringTokenizer st = new StringTokenizer(value.toString(), " \t"); 

      // first element is doc index 
      int index = Integer.parseInt(st.nextToken()); 

      // count term frequencies 
      String word; 
      while (st.hasMoreTokens()) { 
       word = st.nextToken(); 

       // check if word is in the vocabulary 
       if (vocab.containsKey(word)) { 
        if (mapTF.containsKey(word)) { 
         int count = mapTF.get(word); 
         mapTF.put(word, count + 1); 
        } else { 
         mapTF.put(word, 1); 
        } 
       } 
      } 

      // compute TF-IDF 
      int wordIndex; 
      for (String term : mapTF.keySet()) { 
       int tf = mapTF.get(term); 

       if (vocab.containsKey(term)) { 
        wordIndex = vocab.get(term); 

        context.write(new IntWritable(wordIndex), new Text(index + ":" + tf)); 
       } 

      }    
     } 
    } 

    public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text> 
    { 
     @Override 
     public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
     { 

      StringBuilder sb = new StringBuilder(16000); 

      for (Text value : values) 
      { 
       sb.append(value.toString() + " "); 
      } 


      context.write(key, new Text(sb.toString())); 
     } 
    } 

    /** 
    * This is where the MapReduce job is configured and being launched. 
    */ 
    @Override 
    public int run(String[] arguments) throws Exception { 
     ArgumentParser parser = new ArgumentParser("TextPreprocessor"); 

     parser.addArgument("input", true, true, "specify input directory"); 
     parser.addArgument("output", true, true, "specify output directory"); 

     parser.parseAndCheck(arguments); 

     Path inputPath = new Path(parser.getString("input")); 
     Path outputDir = new Path(parser.getString("output")); 

     // Create configuration. 
     Configuration conf = getConf(); 

     // add distributed file with vocabulary 
     DistributedCache 
       .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf); 

     // Create job. 
     Job job = new Job(conf, "WordCount"); 
     job.setJarByClass(IndexerMapper.class); 

     // Setup MapReduce. 
     job.setMapperClass(IndexerMapper.class); 
     //job.setCombinerClass(IndexerReducer.class); 
     job.setReducerClass(IndexerReducer.class); 

     // Sort the output words in reversed order. 
     job.setSortComparatorClass(Comparator.class); 


     job.setNumReduceTasks(1); 

     // Specify (key, value). 
     job.setMapOutputKeyClass(IntWritable.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(Text.class); 

     // Input. 
     FileInputFormat.addInputPath(job, inputPath); 
     job.setInputFormatClass(TextInputFormat.class); 

     // Output. 
     FileOutputFormat.setOutputPath(job, outputDir); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileSystem hdfs = FileSystem.get(conf); 

     // Delete output directory (if exists). 
     if (hdfs.exists(outputDir)) 
      hdfs.delete(outputDir, true); 

     // Execute the job. 
     return job.waitForCompletion(true) ? 0 : 1; 
    } 
} 
+0

只是爲了確認,你在mapper輸出中得到了不同的鍵?你也可以更新示例輸出。也可以在wordpad中查看,如果你有分隔符,如果你的行數很大,你可能會忽略這些行。 –

+0

是的,我得到了不同的映射器的鍵,這是確認...輸出格式映射器是關鍵[TAB]值 – Smajl

+0

你是如何確認你的映射器輸出是正確的?你把減速器的數量設置爲0嗎?另外我認爲你需要在比較器中投射物體?只是嘗試刪除自定義比較器,看看它是否做出任何改變? –

回答

1

嘗試這些調試您的問題 -

  • 減速放數爲0,看看什麼是映射器輸出。
  • 嘗試使用默認比較器,在比較器中也需要投射對象,否則它們不會產生正確的結果。