2014-12-04 73 views
1

我有成千上萬的小文件,我想用combineFileInputFormat處理它們。在combineFileInputFormat中可伸縮不起作用

在combineFileInputFormat中,一個映射器有多個小文件,每個文件都不會被拆分。

像這樣小的輸入文件之一的片段,

vers,3 
period,2015-01-26-18-12-00,438469546,449329626,complete 
config,libdvm.so,chromeview 
pkgproc,com.futuredial.digitchat,10021,,0ns:10860078 
pkgpss,com.futuredial.digitchat,10021,,0ns:9:6627:6627:6637:5912:5912:5912 
pkgsvc-run,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078 
pkgsvc-start,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078 
pkgproc,com.google.android.youtube,10103,,0ns:10860078 
pkgpss,com.google.android.youtube,10103,,0ns:9:12986:13000:13021:11552:11564:11580 
pkgsvc-  run,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078 
pkgsvc- start,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078 

我想通過整個文件內容映射器。但是,hadoop將文件分割爲一半。

例如,上述文件可被劃分爲

vers,3 
period,2015-01-26-18-12-00,438469546,449329626,complete 
config,libdvm.so,chromeview 
pkgproc,com.futuredial.digitchat,#the line has been cut 

但我想要處理整個文件的內容。

這裏是我的代碼,它引用Reading file as single record in hadoop

的驅動代碼

public class CombineSmallfiles { 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: conbinesmallfiles <in> <out>"); 
     System.exit(2); 
    } 

    conf.setInt("mapred.min.split.size", 1); 
    conf.setLong("mapred.max.split.size", 26214400); // 25m 
    //conf.setLong("mapred.max.split.size", 134217728); // 128m 

    //conf.setInt("mapred.reduce.tasks", 5); 

    Job job = new Job(conf, "combine smallfiles"); 
    job.setJarByClass(CombineSmallfiles.class); 
    job.setMapperClass(CombineSmallfileMapper.class); 
    //job.setReducerClass(IdentityReducer.class); 
    job.setNumReduceTasks(0); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgproc",TextOutputFormat.class,Text.class,Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgpss",TextOutputFormat.class,Text.class,Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgsvc",TextOutputFormat.class,Text.class,Text.class); 

    job.setInputFormatClass(CombineSmallfileInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 

    int exitFlag = job.waitForCompletion(true) ? 0 : 1; 
    System.exit(exitFlag); 

} 

} 

我的映射器代碼

public class CombineSmallfileMapper extends Mapper<NullWritable, Text, Text, Text> { 

    private Text file = new Text(); 
    private MultipleOutputs mos; 
    private String period; 
    private Long elapsed; 

    @Override 
    public void setup(Context context) throws IOException, InterruptedException { 
     mos = new MultipleOutputs(context); 
    } 
    @Override 
    protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String file_name = context.getConfiguration().get("map.input.file.name"); 
     String [] filename_tokens = file_name.split("_"); 
     String uuid = filename_tokens[0]; 
     String [] datetime_tokens; 
     try{ 
     datetime_tokens = filename_tokens[1].split("-"); 
     }catch(ArrayIndexOutOfBoundsException err){ 
      throw new ArrayIndexOutOfBoundsException(file_name); 
     } 
     String year,month,day,hour,minute,sec,msec; 
     year = datetime_tokens[0]; 
     month = datetime_tokens[1]; 
     day = datetime_tokens[2]; 
     hour = datetime_tokens[3]; 
     minute = datetime_tokens[4]; 
     sec = datetime_tokens[5]; 
     msec = datetime_tokens[6]; 
     String datetime = year+"-"+month+"-"+"-"+day+" "+hour+":"+minute+":"+sec+"."+msec; 
     String content = value.toString(); 
     String []lines = content.split("\n"); 
     for(int u = 0;u<lines.length;u++){ 
      String line = lines[u]; 
      String []tokens = line.split(","); 
      if(tokens[0].equals("period")){ 
       period = tokens[1]; 
       try{ 
       long startTime = Long.valueOf(tokens[2]); 
       long endTime = Long.valueOf(tokens[3]); 
       elapsed = endTime-startTime; 
       }catch(NumberFormatException err){ 
        throw new NumberFormatException(line); 
       } 
      }else if(tokens[0].equals("pkgproc")){ 
       String proc_info = ""; 
       try{ 
       proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       }catch(ArrayIndexOutOfBoundsException err){ 
        throw new ArrayIndexOutOfBoundsException("pkgproc: "+content+ "line:"+line); 
       } 
       for(int i = 4;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]; 
        mos.write("pkgproc",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime)); 
       } 
      }else if(tokens[0].equals("pkgpss")){ 
       String proc_info = ""; 
       proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       for(int i = 4;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]+","+state_info[2]+","+state_info[3]+","+state_info[4]+","+state_info[5]+","+state_info[6]+","+state_info[7]; 
        mos.write("pkgpss",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime)); 
       } 
      }else if(tokens[0].startsWith("pkgsvc")){ 
       String []stateName = tokens[0].split("-"); 
       String proc_info = ""; 
       //tokens[2] = uid, tokens[3] = serviceName 
       proc_info += stateName[1]+','+period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       String opcount = tokens[4]; 
       for(int i = 5;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[1]; 
        mos.write("pkgsvc",new Text(tokens[1]), new Text(proc_info+state+','+opcount+','+uuid+','+datetime)); 
       } 
      } 
     } 
    } 

} 

我CombineFileInputFormat,它覆蓋isSplitable並返回false

public class CombineSmallfileInputFormat extends CombineFileInputFormat<NullWritable, Text> { 

    @Override 
    public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { 

     return new CombineFileRecordReader<NullWritable,Text>((CombineFileSplit) split,context,WholeFileRecordReader.class); 
    } 
    @Override 
    protected boolean isSplitable(JobContext context,Path file){ 
     return false; 
    } 

} 

的WholeFileRecordReader

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
    //private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); 

     /** The path to the file to read. */ 
     private final Path mFileToRead; 
     /** The length of this file. */ 
     private final long mFileLength; 

     /** The Configuration. */ 
     private final Configuration mConf; 

     /** Whether this FileSplit has been processed. */ 
     private boolean mProcessed; 
     /** Single Text to store the file name of the current file. */ 
    // private final Text mFileName; 
     /** Single Text to store the value of this file (the value) when it is read. */ 
     private final Text mFileText; 

     /** 
     * Implementation detail: This constructor is built to be called via 
     * reflection from within CombineFileRecordReader. 
     * 
     * @param fileSplit The CombineFileSplit that this will read from. 
     * @param context The context for this task. 
     * @param pathToProcess The path index from the CombineFileSplit to process in this record. 
     */ 
     public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, 
      Integer pathToProcess) { 
     mProcessed = false; 
     mFileToRead = fileSplit.getPath(pathToProcess); 
     mFileLength = fileSplit.getLength(pathToProcess); 
     mConf = context.getConfiguration(); 
     context.getConfiguration().set("map.input.file.name", mFileToRead.getName()); 

     assert 0 == fileSplit.getOffset(pathToProcess); 
     //if (LOG.isDebugEnabled()) { 
      //LOG.debug("FileToRead is: " + mFileToRead.toString()); 
      //LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); 

      //try { 
      //FileSystem fs = FileSystem.get(mConf); 
      //assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; 
      //} catch (IOException ioe) { 
      //// oh well, I was just testing. 
      //} 
     //} 

     //mFileName = new Text(); 
     mFileText = new Text(); 
     } 

     /** {@inheritDoc} */ 
     @Override 
     public void close() throws IOException { 
     mFileText.clear(); 
     } 

     /** 
     * Returns the absolute path to the current file. 
     * 
     * @return The absolute path to the current file. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public NullWritable getCurrentKey() throws IOException, InterruptedException { 
     return NullWritable.get(); 
     } 

     /** 
     * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), 
     * this returns the contents of the file as a BytesWritable. Otherwise, it returns an 
     * empty BytesWritable.</p> 
     * 
     * <p>Throws an IllegalStateException if initialize() is not called first.</p> 
     * 
     * @return A BytesWritable containing the contents of the file to read. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
     return mFileText; 
     } 

     /** 
     * Returns whether the file has been processed or not. Since only one record 
     * will be generated for a file, progress will be 0.0 if it has not been processed, 
     * and 1.0 if it has. 
     * 
     * @return 0.0 if the file has not been processed. 1.0 if it has. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public float getProgress() throws IOException, InterruptedException { 
     return (mProcessed) ? (float) 1.0 : (float) 0.0; 
     } 

     /** 
     * All of the internal state is already set on instantiation. This is a no-op. 
     * 
     * @param split The InputSplit to read. Unused. 
     * @param context The context for this task. Unused. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // no-op. 
     } 

     /** 
     * <p>If the file has not already been read, this reads it into memory, so that a call 
     * to getCurrentValue() will return the entire contents of this file as Text, 
     * and getCurrentKey() will return the qualified path to this file as Text. Then, returns 
     * true. If it has already been read, then returns false without updating any internal state.</p> 
     * 
     * @return Whether the file was read or not. 
     * @throws IOException if there is an error reading the file. 
     * @throws InterruptedException if there is an error. 
     */ 
     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!mProcessed) { 
      if (mFileLength > (long) Integer.MAX_VALUE) { 
      throw new IOException("File is longer than Integer.MAX_VALUE."); 
      } 
      byte[] contents = new byte[(int) mFileLength]; 

      FileSystem fs = mFileToRead.getFileSystem(mConf); 
      FSDataInputStream in = null; 
      try { 
      // Set the contents of this file. 
      in = fs.open(mFileToRead); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      mFileText.set(contents, 0, contents.length); 

      } finally { 
      IOUtils.closeQuietly(in); 
      } 
      mProcessed = true; 
      return true; 
     } 
     return false; 
     } 

} 

我希望每一個映射來解析多個小文件和每個小文件不能被分割。

但是,上面的代碼會剪切(拆分)我的輸入文件,並會引發解析錯誤(因爲我的解析器會將行分割爲標記)。

在我的概念中,combineFileInputFormat將多個文件收集到一個分割中,並且每個分割將饋送到一個映射器中。因此,一個映射器可以處理多個文件。

在我的代碼中,最大輸入分割被設置爲25MB,所以我認爲問題在於combineFileInputFormat將分割輸入分割的小文件的最後部分以滿足分割大小限制。

但是,我已覆蓋isSplitable並返回false,但它仍然分裂小文件。

這樣做的正確方法是什麼?

我不確定是否可以指定映射器的文件數量,而不是指定輸入拆分大小?

回答

0

使用setMaxSplitSize()在構造函數代碼的方法,它應該工作, 它最好告訴分割尺寸,

public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text> { 
    public CFInputFormat(){ 
    super(); 
    setMaxSplitSize(67108864); // 64 MB, default block size on hadoop 
    } 
    public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{ 
    return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit)split, context, CFRecordReader.class); 
    } 
    @Override 
    protected boolean isSplitable(JobContext context, Path file){ 
    return false; 
    } 
} 
+0

它仍然分裂我的小文件,即使我用setMaxSplitSize(67108864); – 2014-12-11 11:24:59