2015-02-11 60 views
0

是否可以從*mapred*.JobConf創建有效的*mapreduce*.TaskAttemptIDHadoop - 如何從mapred.JobConf中提取taskId?

背景

我需要寫一個FileInputFormatAdapterExistingFileInputFormat。問題是適配器需要擴展mapred.InputFormat,現有格式擴展爲mapreduce.InputFormat

我需要構建一個mapreduce.TaskAttemptContextImpl,以便我可以實例化ExistingRecordReader。但是,我無法創建有效的TaskId ... taskId以空值出現。

那麼如何從mapred.JobConf獲得taskId,jobId等。

特別是在適配器的getRecordReader我需要做的是這樣的:

public org.apache.hadoop.mapred.RecordReader<NullWritable, MyWritable> getRecordReader(
     org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException { 

    SplitAdapter splitAdapter = (SplitAdapter) split; 

    final Configuration conf = job; 

    /*************************************************/ 
    //The problem is here, "mapred.task.id" is not in the conf 
    /*************************************************/ 
    final TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id")); 

    final TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId); 
    try { 
     return new RecordReaderAdapter(new ExistingRecordReader(
       splitAdapter.getMapRedeuceSplit(), 
       context)); 
    } catch (InterruptedException e) { 
     throw new RuntimeException("Failed to create record-reader.", e); 
    } 
} 

此代碼拋出一個異常:

Caused by: java.lang.NullPointerException 
    at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:44) 
    at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:39) 

'超(CONF,taskId.getJobID());'拋出異常,很可能是因爲taskId爲空。

回答

1

我通過查找HiveHbaseTableInputFormat找到答案。由於我的解決方案是針對配置單元的,因此完美地工作。

TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
     job.getConfiguration(), reporter);