2014-12-02 309 views
4

我是Hadoop中的新手,並嘗試使用Hadoop編寫關係連接。該算法試圖連續兩輪加入三個關係。我使用遞歸方法。該程序工作正常。但在執行過程中它會嘗試打印警告這樣的:hadoop警告EBADF:錯誤的文件描述符

14/12/02 10:41:16 WARN io.ReadaheadPool: Failed readahead on ifile                             
EBADF: Bad file descriptor                                       
     at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)                         
     at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)                     
     at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)                 
     at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)                      
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)                       
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)                       
     at java.lang.Thread.run(Thread.java:745) 

很是煩人,我想知道的問題,以及如何的原因,以擺脫他們。我的代碼如下:

public class Recursive { 
    /** 
    * Join three relations together using recursive method 
    * R JOIN S JOIN T = ((R JOIN S) JOIN T) 
    */ 
    static String[] relationSequence;   // Keeps sequence of relations in join 
    static int round;       // Round number running 
    /** 
    * Mapper 
    * Relation name = R 
    * Input tuple = a b 
    * Output pair = (b, (R,a)) 
    * We assume that join value is the last attribute for the first relation 
    * and the first attribute for the second relation. 
    * So using this assumption, this map-reduce algorithm will work for any number of attributes 
    */ 
    public static class joinMapper extends Mapper<Object, Text, IntWritable, Text>{ 
     public void map(Object keyIn, Text valueIn, Context context) throws IOException, InterruptedException { 
      // Read tuple and put attributes in a string array 
      String curValue = valueIn.toString(); 
      String[] values = curValue.split("\t"); 
      // Get relation name from input file name 
      String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); 
      // Get join attribute index number R join S 
      int joinIndex; 
      String others = ""; 
      if(fileName.compareTo(relationSequence[round])==0){ 
       joinIndex = 0; 
       others = curValue.substring(0+2); 
      }else{ 
       joinIndex = values.length - 1; 
       others = curValue.substring(0, curValue.length()-2); 
      } 
      IntWritable joinValue = new IntWritable(Integer.parseInt(values[joinIndex])); 

      // Create list of attributes which are not join attribute 
      Text temp = new Text(fileName + "|" + others); 
      context.write(joinValue,temp); 
     } 
    } 

    /** 
    * Reducer 
    * 
    * 1. Divide the input list in two ArrayLists based on relation name: 
    *  a. first relation 
    *  b. second relation 
    * 2. Test if the second relation is not empty. If it's so, we shouldn't continue. 
    * 3. For each element of the first array list, join it with the all elements in 
    *  the second array list 
    */ 
    public static class joinReducer extends Reducer<IntWritable, Text, Text, Text>{ 
     public void reduce(IntWritable keyIn, Iterable<Text> valueIn, Context context) 
       throws IOException, InterruptedException{ 
      ArrayList<String> firstRelation = new ArrayList<String>(); 
      ArrayList<String> secondRelation = new ArrayList<String>(); 
      for (Text value : valueIn) { 
       String[] values = value.toString().split("\\|"); 
       if(values[0].compareTo(relationSequence[round])==0){ 
        secondRelation.add(values[1]); 
       }else{ 
        firstRelation.add(values[1]); 
       } 
      } 
      if(secondRelation.size()>0){ 
       for (String firstItem : firstRelation) { 
        for (String secondItem : secondRelation) { 
         context.write(new Text(firstItem.toString()), new Text(keyIn.toString() + "\t" 
                      + secondItem.toString() 
                      )); 
        } 
       } 
      } 
     } 

    } 

    /** 
    * Partitioner 
    * 
    * In order to hash pairs to reducer tasks, we used logical which is 
    * obviously faster than module function. 
    */ 
    public static class joinPartitioner extends Partitioner<IntWritable, Text> { 
     public int getPartition(IntWritable key, Text value, int numReduceTasks) { 
       int partitionNumber = key.get()&0x007F; 
       return partitionNumber; 
      } 
    } 

    /** 
     * Main method 
     * 
     * (R join S join T) 
     * hadoop jar ~/COMP6521.jar Recursive /input/R /input/S /input2/T /output R,S,T 
     * 
     * @param args 
     * <br> args[0]: first relation 
     * <br> args[1]: second relation 
     * <br> args[2]: third relation 
     * <br> args[3]: output directory 
     * <br> args[4]: relation sequence to join, separated by comma 
     */ 
    public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException { 
     long s = System.currentTimeMillis(); 
     /****** Preparing problem variables *******/ 
     relationSequence = args[4].split(",");  // Keep sequence of relations 
     round = 1;         // Variable to keep current round number 
     int maxOfReducers = 128;     // Maximum number of available reducers 
     int noReducers;        // Number of reducers for one particular job 
     noReducers = maxOfReducers; 

     Path firstRelation = new Path(args[0]); 
     Path secondRelation = new Path(args[1]); 
     Path thirdRelation = new Path(args[2]); 
     Path temp   = new Path("/temp"); // Temporary path to keep intermediate result 
     Path out   = new Path(args[3]); 
     /****** End of variable Preparing *******/ 

     Configuration conf = new Configuration(); 

     /****** Configuring first job *******/ 
//  General configuration 
     Job job = Job.getInstance(conf, "Recursive multi-way join (first round)"); 
     job.setNumReduceTasks(noReducers); 

//  Pass appropriate classes 
     job.setJarByClass(Recursive.class); 
     job.setMapperClass(joinMapper.class); 
     job.setPartitionerClass(joinPartitioner.class); 
     job.setReducerClass(joinReducer.class); 

//  Specify input and output type of reducers 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     job.setMapOutputKeyClass(IntWritable.class); 
     job.setMapOutputValueClass(Text.class); 
     FileSystem fs = FileSystem.get(conf); 
     if(fs.exists(temp)){ fs.delete(temp, true);} 
     if(fs.exists(out)) { fs.delete(out, true); } 

//  Specify the input and output paths 
     FileInputFormat.addInputPath(job, firstRelation); 
     FileInputFormat.addInputPath(job, secondRelation); 
     FileOutputFormat.setOutputPath(job, temp); 
     /****** End of first job configuration *******/ 
     job.submit(); 
//  Running the first job 
     boolean b = job.waitForCompletion(true); 
     if(b){ 
//   try to execute the second job after completion of the first one 
      round++;     // Specify round number 
      Configuration conf2 = new Configuration(); // Create new configuration object 

      /****** Configuring second job *******/ 
//   General configuration 
      Job job2 = Job.getInstance(conf2, "Reduce multi-way join (second round)"); 
      job2.setNumReduceTasks(noReducers); 

//   Pass appropriate classes 
      job2.setJarByClass(Recursive.class); 
      job2.setMapperClass(joinMapper.class); 
      job2.setPartitionerClass(joinPartitioner.class); 
      job2.setReducerClass(joinReducer.class); 

//   Specify input and output type of reducers 
      job2.setOutputKeyClass(Text.class); 
      job2.setOutputValueClass(Text.class); 

//   Specify input and output type of mappers 
      job2.setMapOutputKeyClass(IntWritable.class); 
      job2.setMapOutputValueClass(Text.class); 
//   End of 2014-11-25 
//   Specify the input and output paths 
      FileInputFormat.addInputPath(job2, temp); 
      FileInputFormat.addInputPath(job2, thirdRelation); 
      FileOutputFormat.setOutputPath(job2, out); 
      /****** End of second job configuration *******/ 
      job2.submit(); 
//   Running the first job 
      b = job2.waitForCompletion(true); 

//   Output time measurement 
      long e = System.currentTimeMillis() - s; 
      System.out.println("Total: " + e); 
      System.exit(b ? 0 : 1); 
     } 
     System.exit(1); 
    } 

} 

回答

4

我有一個類似的錯誤,我結束了對你的問題,而這mail list thread EBADF: Bad file descriptor

要澄清一點,預讀池有時會吐出這 如果您在預讀請求正在運行時關閉文件,則會出現消息。這是 不是一個錯誤,只是反映了該文件被倉促關閉的事實, 可能是因爲一些其他錯誤,這是真正的問題。

在我來說,我是關閉一個作家不與hflush

沖洗,因爲你似乎不用手使用作家或讀者,我可能會看看你是如何發送先生的任務。