2015-07-21 67 views
0

我開發了一個在Intellij上運行良好並給出輸出的mapreduce代碼。當我在集羣上運行相同的代碼,我得到一個空result.I不斷收到錯誤Mapreduce作業在羣集上給出空輸出

15/07/21 08:28:04 INFO mapreduce.Job: Task Id : attempt_1436660204513_0254_m_000000_0, Status : FAILED Error: Plink/PlinkMapper : Unsupported major.minor version 51.0

此外,在最終合併,洗牌已經失敗。

Map-Reduce Framework 
       Map input records=18858 
       Map output records=0 
       Map output bytes=0 

我使用JDK相同版本的編譯時和運行,我不知道爲什麼我老覺得error.Please在下面找到我的代碼:

司機:

package Plink; 

/** 
* Created by Sai Bharath on 7/21/2015. 
*/ 
import Utils.PlinkConstants; 
import Utils.PlinkDataSetDto; 
import Utils.PlinkDto; 
import Utils.PropertyUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.filecache.DistributedCache; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
/** 
* Created by bvarre on 10/29/2014. 
*/ 
public class PlinkDriver extends Configured implements Tool { 

    @Override 
    public int run(String[] args) throws Exception { 


     if (args.length < 6) { 
      System.err.printf("Usage: %s [generic options] <input> <output>\n", 
        getClass().getSimpleName()); 
      ToolRunner.printGenericCommandUsage(System.err); 
      return -1; 
     } 
     Job job = new Job(); 
     Configuration conf=job.getConfiguration(); 
     conf.set("mapred.child.java.opts","-Xmx8g"); 




     job.setJarByClass(PlinkDriver.class); 


     PropertyUtils.setConfigFromSystemProperty(job.getConfiguration()); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileInputFormat.addInputPath(job, new Path(args[1])); 

     FileOutputFormat.setOutputPath(job, new Path(args[2])); 


     if(args[3] != null && !args[3].isEmpty() && PlinkConstants.LOCAL_FILE_INPUT.equalsIgnoreCase(args[3])){ 
      job.getConfiguration().set("snip.codes", args[4]); 
      job.getConfiguration().set("gene.codes", args[5]); 
     } 
     else { 

      DistributedCache.addCacheFile(new Path(args[4]).toUri(), job.getConfiguration()); 
      DistributedCache.addCacheFile(new Path(args[5]).toUri(), job.getConfiguration()); 

      DistributedCache.createSymlink(conf); 
     } 


     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     job.setMapperClass(PlinkMapper.class); 
     // job.setCombinerClass(PlinkCombiner.class); 
     job.setReducerClass(PlinkReducer.class); 

     //Setup Partitioner 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(PlinkDto.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     return job.waitForCompletion(true) ? 0 : 1; 
    } 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new PlinkDriver(),args); 
     System.exit(exitCode); 
    } 
} 

映射:

package Plink; 

import Utils.PlinkDataSetDto; 
import Utils.PlinkDto; 
import Utils.PlinkResourceBundle; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException; 
import java.util.*; 

public class PlinkMapper extends Mapper<Object, Text, Text, PlinkDto> { 


    private List<String> snipCodes = new ArrayList<String>(); 
    private List<String> geneCodes = new ArrayList<String>(); 

    private String domain; 


    @Override 
    protected void setup(Context context) throws IOException, 
      InterruptedException { 
     super.setup(context); 
     Configuration conf = context.getConfiguration(); 

     snipCodes = PlinkResourceBundle.getCodes(conf, "snip.codes"); 
     geneCodes = PlinkResourceBundle.getCodes(conf, "gene.codes"); 

     System.out.println(" snip code size in nMapper :: " + snipCodes.size()); 
     System.out.println(" gene code size in nMapper :: " + geneCodes.size()); 
    } 

    @Override 
    protected void map(Object key, Text value, 
         Context context) throws IOException, InterruptedException { 

     try { 

      String str = (value.toString()); 
      if (str != null && !str.equals("")) { 
       List<String> items = Arrays.asList(str.split("\\s+")); 
       if(items!=null && items.size()>=3) { 
        List<PlinkDto> snipList = new ArrayList<PlinkDto>(); 
        List<PlinkDto> geneList = new ArrayList<PlinkDto>(); 
        Text plinkKey = new Text(); 
        plinkKey.set(items.get(0)); 
        if(!items.get(2).equalsIgnoreCase("null") && !items.get(2).equalsIgnoreCase("na")) { 
         PlinkDto plinkDto = new PlinkDto(); 
         plinkDto.setCodeDesc(items.get(1)); 
         plinkDto.setCodeValue(new Float(items.get(2))); 
         if (snipCodes.contains(items.get(1))) { 
          plinkDto.setCode("SNIP"); 
          snipList.add(plinkDto); 
         } else if (geneCodes.contains(items.get(1))) { 
          plinkDto.setCode("GENE"); 
          geneList.add(plinkDto); 
         } 
         context.write(plinkKey,plinkDto); 
        } 
       } 
      } 
     }catch(Exception ex){ 
      //Collecting Patient data 
      ex.printStackTrace(); 
     } 

    } 
} 

減速機:

package Plink; 

/** 
* Created by Sai Bharath on 7/15/2015. 
*/ 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Iterator; 
import java.util.List; 

import Utils.PlinkDataSetDto; 
import Utils.PlinkDto; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 



public class PlinkReducer extends Reducer<Text, PlinkDto, Text, Text> { 


    @Override 
    public void reduce(Text key, Iterable<PlinkDto> values, Context context) 
      throws IOException, InterruptedException { 
     List<PlinkDto> snipList = new ArrayList<PlinkDto>(); 
     List<PlinkDto> geneList = new ArrayList<PlinkDto>(); 
     Iterator<PlinkDto> it=values.iterator(); 
     while (it.hasNext()) { 
      PlinkDto tempDto = it.next(); 
      if (tempDto.getCode().equalsIgnoreCase("SNIP")) { 
       PlinkDto snipDto = new PlinkDto(); 
       snipDto.setCode(tempDto.getCode()); 
       snipDto.setCodeDesc(tempDto.getCodeDesc()); 
       snipDto.setCodeValue(tempDto.getCodeValue()); 
       snipList.add(snipDto); 
      } else if (tempDto.getCode().equalsIgnoreCase("GENE")) { 
       PlinkDto geneDto = new PlinkDto(); 
       geneDto.setCode(tempDto.getCode()); 
       geneDto.setCodeDesc(tempDto.getCodeDesc()); 
       geneDto.setCodeValue(tempDto.getCodeValue()); 
       geneList.add(geneDto); 
      } 
     } 

     for(PlinkDto snip:snipList){ 
      for(PlinkDto gene:geneList){ 
       PlinkDataSetDto dataSetDto = new PlinkDataSetDto(); 
       dataSetDto.setSnipCodeDesc(snip.getCodeDesc()); 
       dataSetDto.setGeneCodeDesc(gene.getCodeDesc()); 
       dataSetDto.setSnipCodeValue(snip.getCodeValue()); 
       dataSetDto.setGeneCodeValue(gene.getCodeValue()); 
       Text output = new Text(); 
       output.set(dataSetDto.toString()); 
       context.write(key,output); 
      } 
     } 

    } 
} 

PlinkResourceBundle

package Utils; 

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.fs.Path; 

import org.apache.hadoop.mapreduce.Job; 

import java.util.*; 

public class PlinkResourceBundle { 

    private PlinkResourceBundle() { 
    } 

    public static List<String> getCodes(Configuration conf, String codeType) throws IOException { 
     List<String> codeList = new ArrayList<String>(); 
     try { 
      String inFile = conf.get(codeType); 

      if (inFile != null) { 
       List<String> lines = HdfsUtils.readFile(inFile); 
       for (String line : lines) { 
        if (line != null && line.length() > 0) { 
         codeList.add(line.trim()); 
        } 
       } 
      } else { 
       Path[] cachefiles = DistributedCache.getLocalCacheFiles(conf); 
       if (cachefiles.length > 0) { 
        BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString())); 
        String line; 
        while ((line = reader.readLine()) != null) { 
         codeList.add((line.trim())); 
        } 
       } 
      } 
     } 
     catch (Exception ex) { 
      System.out.println("Error in getting snip/gene codes " + ex.getMessage()); 
     } 
     return codeList; 
    }//end of method 
} 

的pom.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>Plink</groupId> 
    <artifactId>Plink</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <properties> 
     <jdkLevel>1.7</jdkLevel> 
     <requiredMavenVersion>[3.3,)</requiredMavenVersion> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.build.outputEncoding>UTF-8</project.build.outputEncoding> 

    </properties> 

    <distributionManagement> 
     <repository> 
      <id>code-artifacts</id> 
      <url> 
       http://code/artifacts/content/repositories/releases 
      </url> 
     </repository> 
     <snapshotRepository> 
      <id>code-artifacts</id> 
      <url> 
       http://code/artifacts/content/repositories/snapshots 
      </url> 
     </snapshotRepository> 
    </distributionManagement> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-surefire-plugin</artifactId> 
       <version>2.18.1</version> 
       <configuration> 
        <skipTests>true</skipTests> 
       </configuration> 
      </plugin> 

      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.3</version> 
       <configuration> 
        <source>${jdkLevel}</source> 
        <target>${jdkLevel}</target> 
        <showDeprecation>true</showDeprecation> 
        <showWarnings>true</showWarnings> 
       </configuration> 
       <dependencies> 
        <dependency> 
         <groupId>org.codehaus.groovy</groupId> 
         <artifactId>groovy-eclipse-compiler</artifactId> 
         <version>2.9.2-01</version> 
        </dependency> 

        <dependency> 
         <groupId>org.codehaus.groovy</groupId> 
         <artifactId>groovy-eclipse-batch</artifactId> 
         <version>2.4.3-01</version> 
        </dependency> 
       </dependencies> 
      </plugin> 
      <plugin> 
       <artifactId>maven-dependency-plugin</artifactId> 
       <executions> 
        <execution> 
         <phase>package</phase> 
         <goals> 
          <goal>copy-dependencies</goal> 
         </goals> 
         <configuration> 
          <outputDirectory>${project.build.directory}/lib</outputDirectory> 

          <includeScope>provided</includeScope> 
          <includeScope>compile</includeScope> 
         </configuration> 
        </execution> 
       </executions> 
      </plugin> 

     </plugins> 
    </build> 

    <repositories> 
     <repository> 
      <releases> 
       <enabled>true</enabled> 
       <updatePolicy>always</updatePolicy> 
       <checksumPolicy>warn</checksumPolicy> 
      </releases> 
      <snapshots> 
       <enabled>false</enabled> 
       <updatePolicy>never</updatePolicy> 
       <checksumPolicy>fail</checksumPolicy> 
      </snapshots> 
      <id>HDPReleases</id> 
      <name>HDP Releases</name> 
      <url>http://repo.hortonworks.com/content/repositories/releases/</url> 
      <layout>default</layout> 
     </repository> 
    </repositories> 

    <dependencies> 
     <dependency> 
      <groupId>commons-logging</groupId> 
      <artifactId>commons-logging</artifactId> 
      <version>1.2</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.6.0.2.2.4.2-2</version> 
      <scope>provided</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.oozie</groupId> 
      <artifactId>oozie-core</artifactId> 
      <version>4.1.0</version> 
      <scope>provided</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-core</artifactId> 
      <version>0.20.2</version> 

     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.17</version> 
     </dependency> 

     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-api</artifactId> 
      <version>1.7.5</version> 
     </dependency> 
     <dependency> 
      <groupId>org.testng</groupId> 
      <artifactId>testng</artifactId> 
      <version>6.8.7</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.mrunit</groupId> 
      <artifactId>mrunit</artifactId> 
      <version>1.0.0</version> 
      <classifier>hadoop2</classifier> 
     </dependency> 
     <dependency> 
      <groupId>org.mockito</groupId> 
      <artifactId>mockito-core</artifactId> 
      <version>1.9.5</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>commons-cli</groupId> 
      <artifactId>commons-cli</artifactId> 
      <version>1.2</version> 
     </dependency> 
     <dependency> 
      <groupId>commons-httpclient</groupId> 
      <artifactId>commons-httpclient</artifactId> 
      <version>3.1</version> 
     </dependency> 
    </dependencies> 

</project> 

15/07/22 08:41:57 INFO mapreduce.Job: map 0% reduce 0% 
15/07/22 08:42:06 INFO mapreduce.Job: map 100% reduce 0% 
15/07/22 08:42:13 INFO mapreduce.Job: map 100% reduce 100% 
15/07/22 08:42:13 INFO mapreduce.Job: Job job_1436660204513_0286 completed successfully 
15/07/22 08:42:13 INFO mapreduce.Job: Counters: 50 
     File System Counters 
       FILE: Number of bytes read=6 
       FILE: Number of bytes written=364577 
       FILE: Number of read operations=0 
       FILE: Number of large read operations=0 
       FILE: Number of write operations=0 
       HDFS: Number of bytes read=604494 
       HDFS: Number of bytes written=0 
       HDFS: Number of read operations=9 
       HDFS: Number of large read operations=0 
       HDFS: Number of write operations=2 
     Job Counters 
       Launched map tasks=2 
       Launched reduce tasks=1 
       Other local map tasks=1 
       Rack-local map tasks=1 
       Total time spent by all maps in occupied slots (ms)=13453 
       Total time spent by all reduces in occupied slots (ms)=9188 
       Total time spent by all map tasks (ms)=13453 
       Total time spent by all reduce tasks (ms)=4594 
       Total vcore-seconds taken by all map tasks=13453 
       Total vcore-seconds taken by all reduce tasks=4594 
       Total megabyte-seconds taken by all map tasks=27551744 
       Total megabyte-seconds taken by all reduce tasks=18817024 
     Map-Reduce Framework 
       Map input records=18858 
       Map output records=0 
       Map output bytes=0 
       Map output materialized bytes=12 
       Input split bytes=266 
       Combine input records=0 
       Combine output records=0 
       Reduce input groups=0 
       Reduce shuffle bytes=12 
       Reduce input records=0 
       Reduce output records=0 
       Spilled Records=0 
       Shuffled Maps =2 
       Failed Shuffles=0 
       Merged Map outputs=2 
       GC time elapsed (ms)=118 
       CPU time spent (ms)=10260 
       Physical memory (bytes) snapshot=1023930368 
       Virtual memory (bytes) snapshot=9347194880 
       Total committed heap usage (bytes)=5474615296 
     Shuffle Errors 
       BAD_ID=0 
       CONNECTION=0 
       IO_ERROR=0 
       WRONG_LENGTH=0 
       WRONG_MAP=0 
       WRONG_REDUCE=0 
     File Input Format Counters 
       Bytes Read=604228 
     File Output Format Counters 
       Bytes Written=0 

任何人都可以幫我嗎?

回答

0

這似乎與羣集上的運行時Java VM不兼容。 「不支持major.minor版本51.0」表示類文件PlinkMapper至少需要一個Java 7虛擬機。我會建議確認集羣上正在運行的JRE版本。

在類文件中定義的主要版本號列在下面 -

  • J2SE 8 = 52
  • J2SE 7 = 51
  • J2SE 6.0 = 50
  • J2SE 5.0 = 49
  • JDK 1.4 = 48
  • JDK 1.3 = 47
  • JDK 1.2 = 46
  • JDK 1.1 = 45
+0

謝謝你的回覆Andy。我驗證了這一點,並且JDK和JRE都是相同的,即Java 7 VM。 – Bharath