2012-01-27 59 views
3

我的地圖可降低結構錯誤而鏈接的Map Reduce作業

public class ChainingMapReduce { 



    public static class ChainingMapReduceMapper 
    extends Mapper<Object, Text, Text, IntWritable>{ 

     public void map(Object key, Text value, Context context 
       ) throws IOException, InterruptedException { 

          // code 

      } 
     } 
    } 


    public static class ChainingMapReduceReducer 
    extends Reducer<Text,IntWritable,Text,IntWritable> { 


     public void reduce(Text key, Iterable<IntWritable> values, 
        Context context 
        ) throws IOException, InterruptedException { 

      //code 

        } 
    } 


    public static class ChainingMapReduceMapper1 
    extends Mapper<Object, Text, Text, IntWritable>{ 

     public void map(Object key, Text value, Context context 
       ) throws IOException, InterruptedException { 

      //code 
      } 
     } 
    } 

    public static class ChainingMapReduceReducer1 
    extends Reducer<Text,IntWritable,Text,IntWritable> { 


     public void reduce(Text key, Iterable<IntWritable> values, 
        Context context 
        ) throws IOException, InterruptedException { 

      //code 
     } 
    } 

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 

     Configuration conf = new Configuration(); 

     Job job = new Job(conf, "First"); 
     job.setJarByClass(ChainingMapReduce.class); 
     job.setMapperClass(ChainingMapReduceMapper.class); 
     job.setCombinerClass(ChainingMapReduceReducer.class); 
     job.setReducerClass(ChainingMapReduceReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 


    FileInputFormat.addInputPath(job, new Path("/home/Desktop/log")); 
     FileOutputFormat.setOutputPath(job, new Path("/home/Desktop/temp/output"));   
     job.waitForCompletion(true); 


     System.out.println("First Job Completed.....Starting Second Job"); 
     System.out.println(job.isSuccessful()); 


     /* FileSystem hdfs = FileSystem.get(conf); 

     Path fromPath = new Path("/home/Desktop/temp/output/part-r-00000"); 
     Path toPath = new Path("/home/Desktop/temp/output1"); 
     hdfs.rename(fromPath, toPath); 
     conf.clear(); 

     */ 
     if(job.isSuccessful()){ 
      Configuration conf1 = new Configuration(); 
      Job job1 = new Job(conf1,"Second"); 
      job1.setJarByClass(ChainingMapReduce.class); 
      job1.setMapperClass(ChainingMapReduceMapper1.class); 
      job1.setCombinerClass(ChainingMapReduceReducer1.class); 
      job1.setReducerClass(ChainingMapReduceReducer1.class); 
      job1.setOutputKeyClass(Text.class); 
      job1.setOutputValueClass(IntWritable.class); 
      FileInputFormat.addInputPath(job, new Path("/home/Desktop/temp/output/part-r-00000)"); 
      FileOutputFormat.setOutputPath(job, new Path("/home/Desktop/temp/output1")); 
      System.exit(job1.waitForCompletion(true) ? 0 : 1); 
     } 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 

    } 

    } 

當我運行這個程序...首先,工作完美地得到執行,並且下面的錯誤後,來了:

第一份工作已完成。 ....啓動第二個工作true

12/01/27 15:24:21信息jvm.JvmMetrics:無法初始化JVM度量標準 with processName = JobTracker,sessionId = - 已經初始化12/01/27 15:24:21警告mapred.JobClient:使用GenericOptionsParser解析 參數。應用程序應該實現相同的工具。 12/01/27 15:24:21警告mapred.JobClient:沒有作業jar文件集。可能找不到用戶 的類。請參閱JobConf(Class)或 JobConf#setJar(String)。 12/01/27 15:24:21信息mapred.JobClient: 清理分段區域 file:/tmp/hadoop/mapred/staging/4991311720439552/.staging/job_local_0002 線程「main」中的異常 org.apache .hadoop.mapred.InvalidJobConfException:輸出目錄不是 集。在 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:123) 在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:872)在 org.apache .hadoop.mapred.JobClient $ 2.run(JobClient.java:833)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at org .apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapreduce.Job上的org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833) 。提交(Job.java:476) org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)at Chaini ngMapReduce.main(ChainingMapReduce.java:129)

我嘗試使用「conf」作業和「conf」「conf1」作爲各自的作業。

回答

4

變化

FileInputFormat.addInputPath(job, new Path("/home/Desktop/temp/output/part-r-00000)"); 
FileOutputFormat.setOutputPath(job, new Path("/home/Desktop/temp/output1")); 

FileInputFormat.addInputPath(job1, new Path("/home/Desktop/temp/output/part-r-00000)"); 
FileOutputFormat.setOutputPath(job1, new Path("/home/Desktop/temp/output1")); 

的第二份工作。

也考慮使用o.a.h.mapred.jobcontrol.JobApache Oozie

+0

謝謝..我弄錯了.. – pradeep 2012-01-27 08:20:52

相關問題