2011-09-27 129 views
2

我想爲我的MapReduce作業實現一個MultithreadMapper。當使用MultithreadMapper替換映射器時,映射鍵的類型不匹配

爲此,我用一個工作代碼中的MultithreadMapper替換了Mapper。

這裏是我得到的exeption:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862) 
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549) 
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211) 
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) 
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) 
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264) 

下面的代碼設置:

public static void main(String[] args) { 
    try { 
     if (args.length != 2) { 
      System.err.println("Usage: MapReduceMain <input path> <output path>"); 
      System.exit(123); 
     } 
     Job job = new Job(); 
     job.setJarByClass(MapReduceMain.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration()); 
     FileStatus[] files = fs.listStatus(new Path(args[0])); 
     for(FileStatus sfs:files){ 
      FileInputFormat.addInputPath(job, sfs.getPath()); 
     } 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setMapperClass(MyMultithreadMapper.class); 
     job.setReducerClass(MyReducer.class); 
     MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads); 

     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(MyPage.class); 

     job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

而這裏的映射器的代碼:

public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> { 

ConcurrentLinkedQueue<MyScraper> scrapers = new ConcurrentLinkedQueue<MyScraper>(); 

public static final int    nThreads = 5; 

public MyMultithreadMapper() { 
    for (int i = 0; i < nThreads; i++) { 
     scrapers.add(new MyScraper()); 
    } 
} 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
    MyScraper scraper = scrapers.poll(); 

    MyPage result = null; 
    for (int i = 0; i < 10; i++) { 
     try { 
      result = scraper.scrapPage(value.toString(), true); 
      break; 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    if (result == null) { 
     result = new MyPage(); 
     result.setUrl(key.toString()); 
    } 

    context.write(new IntWritable(result.getUrl().hashCode()), result); 

    scrapers.add(scraper); 
} 

憑啥我收到這個?

回答

1

這裏是有許多工作要做:

MultithreadedMapper.setMapperClass(工作,MyMapper.class);

MyMapper必須實現地圖邏輯

MultithreadMapper必須是空的

相關問題