2013-03-24 53 views
0

我想編寫自己的RecordReader,它返回上下文作爲整個段落而不是TextInputFormat中的行。將RecordReader上下文設置爲Hadoop中的段落MapReduce

我嘗試了以下功能,但肯定其路斷

public boolean nextKeyValue() throws IOException, InterruptedException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    value.clear(); 
    final Text endline = new Text("\n"); 
    int newSize = 0; 

     Text v = new Text(); 
     while (v!= endline) { 
      value.append(v.getBytes(),0, v.getLength()); 
      value.append(endline.getBytes(),0, endline.getLength()); 
      if (newSize == 0) { 
       break; 
      } 
      pos += newSize; 
      if (newSize < maxLineLength) { 
       break; 
      } 
     } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     return true; 
    } 
} 
+0

你是什麼意思的「路過」?你有什麼問題? – 2013-03-24 00:20:13

+0

您還需要找到一種方法來定義段落邊界 - 它們是空行分隔的還是縮進的新段落的第一句(tab或whitespace)? – 2013-03-24 13:40:38

回答

0

你實際上並不需要一定要編寫自己的RecordReader的努力。相反,只需擴展TextInputFormat並更改分隔符。以下是TextInputFormat的庫代碼,僅更改了分隔符:

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

import com.google.common.base.Charsets; 

public class ParagraphInputFormat 
extends TextInputFormat { 
    private static final String PARAGRAPH_DELIMITER = "\r\n\r\n"; 

    @Override 
    protected boolean isSplitable(JobContext context, Path file) { 
     return false; 
    } 

    @Override 
    public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split, TaskAttemptContext context) { 
     String delimiter = PARAGRAPH_DELIMITER; 
     byte[] recordDelimiterBytes = null; 
     if (null != delimiter) { 
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); 
     } 
     return new LineRecordReader(recordDelimiterBytes); 
    } 
}