2016-09-20 73 views
0

我是Hadoop的新手,我的map-reduce代碼可以工作,但不會產生任何輸出。這是MapReduce的的信息:Hadoop減少輸入記錄= 0

16/09/20 13:11:40 INFO mapred.JobClient: Job complete: job_201609081210_0078 
16/09/20 13:11:40 INFO mapred.JobClient: Counters: 28 
16/09/20 13:11:40 INFO mapred.JobClient: Map-Reduce Framework 
16/09/20 13:11:40 INFO mapred.JobClient:  Spilled Records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Map output materialized bytes=1362 
16/09/20 13:11:40 INFO mapred.JobClient:  Reduce input records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=466248720384 
16/09/20 13:11:40 INFO mapred.JobClient:  Map input records=852032443 
16/09/20 13:11:40 INFO mapred.JobClient:  SPLIT_RAW_BYTES=29964 
16/09/20 13:11:40 INFO mapred.JobClient:  Map output bytes=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Reduce shuffle bytes=1362 
16/09/20 13:11:40 INFO mapred.JobClient:  Physical memory (bytes) snapshot=57472311296 
16/09/20 13:11:40 INFO mapred.JobClient:  Reduce input groups=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Combine output records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Reduce output records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Map output records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Combine input records=0 
16/09/20 13:11:40 INFO mapred.JobClient:  CPU time spent (ms)=2375210 
16/09/20 13:11:40 INFO mapred.JobClient:  Total committed heap usage (bytes)=47554494464 
16/09/20 13:11:40 INFO mapred.JobClient: File Input Format Counters 
16/09/20 13:11:40 INFO mapred.JobClient:  Bytes Read=15163097088 
16/09/20 13:11:40 INFO mapred.JobClient: FileSystemCounters 
16/09/20 13:11:40 INFO mapred.JobClient:  HDFS_BYTES_READ=15163127052 
16/09/20 13:11:40 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=13170190 
16/09/20 13:11:40 INFO mapred.JobClient:  FILE_BYTES_READ=6 
16/09/20 13:11:40 INFO mapred.JobClient: Job Counters 
16/09/20 13:11:40 INFO mapred.JobClient:  Launched map tasks=227 
16/09/20 13:11:40 INFO mapred.JobClient:  Launched reduce tasks=1 
16/09/20 13:11:40 INFO mapred.JobClient:  SLOTS_MILLIS_REDUCES=759045 
16/09/20 13:11:40 INFO mapred.JobClient:  Total time spent by all reduces waiting after reserving slots (ms)=0 
16/09/20 13:11:40 INFO mapred.JobClient:  SLOTS_MILLIS_MAPS=1613259 
16/09/20 13:11:40 INFO mapred.JobClient:  Total time spent by all maps waiting after reserving slots (ms)=0 
16/09/20 13:11:40 INFO mapred.JobClient:  Data-local map tasks=227 
16/09/20 13:11:40 INFO mapred.JobClient: File Output Format Counters 
16/09/20 13:11:40 INFO mapred.JobClient:  Bytes Written=0 

這是啓動MapReduce工作的代碼的代碼:

import java.io.File; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class mp{ 

public static void main(String[] args) throws Exception { 

    Job job1 = new Job(); 
    job1.setJarByClass(mp.class); 
    FileInputFormat.addInputPath(job1, new Path(args[0]));     
    String oFolder = args[0] + "/output"; 
    FileOutputFormat.setOutputPath(job1, new Path(oFolder)); 
    job1.setMapperClass(TransMapper1.class); 
    job1.setReducerClass(TransReducer1.class); 
    job1.setMapOutputKeyClass(LongWritable.class); 
    job1.setMapOutputValueClass(DnaWritable.class); 
    job1.setOutputKeyClass(LongWritable.class); 
    job1.setOutputValueClass(Text.class); 
} 
} 

這裏,它是映射器類(TransMapper1):

import java.io.IOException; 
import java.util.StringTokenizer; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class TransMapper1 extends Mapper<LongWritable, Text, LongWritable, DnaWritable> { 

    @Override 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     LongWritable bamWindow = new LongWritable(Long.parseLong(tokenizer.nextToken())); 
     LongWritable read = new LongWritable(Long.parseLong(tokenizer.nextToken())); 
     LongWritable refWindow = new LongWritable(Long.parseLong(tokenizer.nextToken())); 
     IntWritable chr = new IntWritable(Integer.parseInt(tokenizer.nextToken())); 
     DoubleWritable dist = new DoubleWritable(Double.parseDouble(tokenizer.nextToken())); 
     DnaWritable dnaW = new DnaWritable(bamWindow,read,refWindow,chr,dist); 
     context.write(bamWindow,dnaW); 
    } 
} 

這是減速類(TransReducer1):

import java.io.IOException; 
import java.util.ArrayList; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

public class TransReducer1 extends Reducer<LongWritable, DnaWritable, LongWritable, Text> { 

@Override 
public void reduce(LongWritable key, Iterable<DnaWritable> values, Context context) throws IOException, InterruptedException { 

ArrayList<DnaWritable> list = new ArrayList<DnaWritable>(); 
double minDist = Double.MAX_VALUE; 
    for (DnaWritable value : values) { 
      long bamWindow = value.getBamWindow().get(); 
      long read = value.getRead().get(); 
      long refWindow = value.getRefWindow().get(); 
      int chr = value.getChr().get(); 
      double dist = value.getDist().get(); 
      if (dist > minDist) 
       continue; 
      else 
      if (dist < minDist) 
       list.clear(); 
      list.add(new DnaWritable(bamWindow,read,refWindow,chr,dist)); 
      minDist = Math.min(minDist, value.getDist().get()); 
     } 
     for(int i = 0; i < list.size(); i++){ 
      context.write(new LongWritable(list.get(i).getRead().get()),new Text(new DnaWritable(list.get(i).getBamWindow(),list.get(i).getRead(),list.get(i).getRefWindow(),list.get(i).getChr(),list.get(i).getDist()).toString())); 
     } 
    } 
} 

這是DnaWritable類(我因此未放進口部總之它點點):

public class DnaWritable implements Writable { 
    LongWritable bamWindow; 
    LongWritable read; 
    LongWritable refWindow; 
    IntWritable chr; 
    DoubleWritable dist; 

    public DnaWritable(LongWritable bamWindow, LongWritable read, LongWritable refWindow, IntWritable chr, DoubleWritable dist){ 

    this.bamWindow = bamWindow; 
    this.read = read; 
    this.refWindow = refWindow; 
    this.chr = chr; 
    this.dist = dist; 
} 

public DnaWritable(long bamWindow, long read, long refWindow, int chr, double dist){ 
    this.bamWindow = new LongWritable(bamWindow); 
    this.read = new LongWritable(read); 
    this.refWindow = new LongWritable(refWindow); 
    this.chr = new IntWritable(chr); 
    this.dist = new DoubleWritable(dist); 
} 

@Override 
public void write(DataOutput dataOutput) throws IOException { 
    bamWindow.write(dataOutput); 
    read.write(dataOutput); 
    refWindow.write(dataOutput); 
    chr.write(dataOutput); 
    dist.write(dataOutput); 
} 

@Override 
public void readFields(DataInput dataInput) throws IOException { 
     bamWindow.readFields(dataInput); 
     read.readFields(dataInput); 
     refWindow.readFields(dataInput); 
     chr.readFields(dataInput); 
     dist.readFields(dataInput); 
    } 
} 

任何幫助將非常感激。謝謝

+1

你能提供一些示例數據? – cody123

+1

是,樣本輸入數據將在這裏在這裏得到答案,提供輸入 – Koitoer

+0

的二,三線謝謝你們...我只是發現了一個缺陷在我的輸入文件的一個......我希望這解決了我問題。謝謝 –

回答

5

你可以改變你DnaWritable類和測試相同。(處理NPE)

package com.hadoop.intellipaat; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.Writable; 

public class DnaWritable implements Writable { 

    private Long bamWindow; 
    private Long read; 
    private Long refWindow; 
    private Integer chr; 
    private Double dist; 

    public DnaWritable(Long bamWindow, Long read, Long refWindow, Integer chr, Double dist) { 
     super(); 
     this.bamWindow = bamWindow; 
     this.read = read; 
     this.refWindow = refWindow; 
     this.chr = chr; 
     this.dist = dist; 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeLong(bamWindow); 
     out.writeLong(read); 
     out.writeLong(refWindow); 
     out.writeInt(chr); 
     out.writeDouble(dist); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     this.bamWindow = in.readLong(); 
     this.read = in.readLong(); 
     this.refWindow = in.readLong(); 
     this.chr = in.readInt(); 
     this.dist = in.readDouble(); 
    } 

} 
+0

最好避免裝箱原語。我建議將字段更改爲其基本類型。我這樣說只是因爲它看起來不像空值是OPs字段的可能值。 –

+0

我改變了實施,但到目前爲止沒有運氣 –

+0

@Hamid_UMB請提供一些樣本數據?所以我可以檢查。 – cody123

0

我想你沒有正確執行write(DataOutput out)readFields(DataInput in)方法在您的DnaWritable類中。

+0

我也放了DnaWritable代碼。請看看那個。謝謝 –

0

考慮也實現ComparableWritable如下,還添加無參數的構造函數。

public class DnaWritable implements Writable WritableComparable<DnaWritable> { 

//Consider add a non-args constructor 
public DnaWritable(){ 
} 

    //Add this static method as well 
public static DnaWritable read(DataInput in) throws IOException { 
     DnaWritable dnaWritable = new DnaWritable(); 
     dnaWritable.readFields(in); 
     return dnaWritable; 
} 

@Override 
public int compareTo(DnaWritable dnaWritable) { 
     //Put your comparison logic there. 
} 

} 

如果仍然失敗,我log4.properties,所以你可以看到,如果有任何潛在的錯誤,你沒有看到。

的src/main /資源

hadoop.root.logger=DEBUG, console 
log4j.rootLogger=INFO, stdout 

# Direct log messages to stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.Target=System.out 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n 
+0

當我試圖實現WritableComparable接口時,出現此錯誤:錯誤:非靜態方法readFields(DataInput)不能從靜態上下文中引用DnaWritable.readFields(in); ..。我怎麼解決它? –

+0

它應該是dnaWritable,小寫字母不好,應該是最近創建的實例。 – Koitoer

1

我不認爲你已經在所有提交的作業到集羣。在主類中沒有job1.submit()或job1.waitForCompletion(true)。

////submit the job to hadoop 
if (!job1.waitForCompletion(true)) 
return; 

還有你的主要方法需要更正。

Job job1 = new Job(); //new Job() constructor is deprecated now. 

下面是正確的,以創建一個作業對象

Configuration conf = new Configuration(); 
Job job1 = Job.getInstance(conf, "Your Program name");