2014-10-17 63 views
0

我想創建我自己的地圖減少作業。如何定義我們自己的地圖和減少類

地圖類的輸出是:文本(鍵),文本(值)

的減少類的輸出是:文本,Intwritable

我試圖實現它的方式如下:

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class artistandTrack { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 
     String line = value.toString(); 
      String[] names=line.split(" "); 
      Text artist_name = new Text(names[2]); 
      Text track_name = new Text(names[3]);    

output.collect(artist_name,track_name); 
    } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> { 
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
     sum += 1; 
      Text x1=values.next(); 
        } 
     output.collect(key, new IntWritable(sum)); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(artistandTrack.class); 
    conf.setJobName("artisttrack"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

     conf.setMapOutputKeyClass(Text.class); 
     conf.setMapOutputValueClass(Text.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 
} 

當我嘗試運行它,它顯示下面的輸出,並終止

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
14/10/17 06:09:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 
14/10/17 06:09:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 
14/10/17 06:09:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 
14/10/17 06:09:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
14/10/17 06:09:19 INFO mapred.FileInputFormat: Total input paths to process : 1 
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: number of splits:1 
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local803195645_0001 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 
14/10/17 06:09:20 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 
14/10/17 06:09:20 INFO mapreduce.Job: Running job: job_local803195645_0001 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter set in config null 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Waiting for map tasks 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Starting task: attempt_local803195645_0001_m_000000_0 
14/10/17 06:09:20 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 
14/10/17 06:09:20 INFO mapred.MapTask: Processing split: hdfs://localhost:54310/project5/input/sad.txt:0+272 
14/10/17 06:09:21 INFO mapred.MapTask: numReduceTasks: 1 
14/10/17 06:09:21 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/10/17 06:09:21 INFO mapreduce.Job: Job job_local803195645_0001 running in uber mode : false 
14/10/17 06:09:21 INFO mapreduce.Job: map 0% reduce 0% 
14/10/17 06:09:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 
14/10/17 06:09:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 
14/10/17 06:09:22 INFO mapred.MapTask: soft limit at 83886080 
14/10/17 06:09:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 
14/10/17 06:09:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 
14/10/17 06:09:25 INFO mapred.LocalJobRunner: 
14/10/17 06:09:25 INFO mapred.MapTask: Starting flush of map output 
14/10/17 06:09:25 INFO mapred.MapTask: Spilling map output 
14/10/17 06:09:25 INFO mapred.MapTask: bufstart = 0; bufend = 120; bufvoid = 104857600 
14/10/17 06:09:25 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600 
14/10/17 06:09:25 INFO mapred.LocalJobRunner: map task executor complete. 
14/10/17 06:09:25 WARN mapred.LocalJobRunner: job_local803195645_0001 
***java.lang.Exception: java.io.IOException: wrong value class: class   org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199) 
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307) 
at artistandTrack$Reduce.reduce(artistandTrack.java:44) 
at artistandTrack$Reduce.reduce(artistandTrack.java:37) 
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1572) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1611) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1462) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:437) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) 
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)*** 


14/10/17 06:09:26 INFO mapreduce.Job: Job job_local803195645_0001 failed with state FAILED due to: NA 
14/10/17 06:09:26 INFO mapreduce.Job: Counters: 11 
Map-Reduce Framework 
    Map input records=4 
    Map output records=4 
    Map output bytes=120 
    Map output materialized bytes=0 
    Input split bytes=97 
    Combine input records=0 
    Combine output records=0 
    Spilled Records=0 
    Failed Shuffles=0 
    Merged Map outputs=0 
File Input Format Counters 
    Bytes Read=272 
Exception in thread "main" java.io.IOException: Job failed! 
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836) 
at artistandTrack.main(artistandTrack.java:68) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.hadoop.util.RunJar.main(RunJar.java:212) 

從哪裏不對勁類快到

java.lang.Exception: java.io.IOException: wrong value class: class   org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 

爲什麼作業失敗

Exception in thread "main" java.io.IOException: Job failed! 
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836) 
at artistandTrack.main(artistandTrack.java:68) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.hadoop.util.RunJar.main(RunJar.java:212) 

我不明白的地方它會錯了。 任何幫助

回答

0

我認爲這個問題是在行:

conf.setCombinerClass(Reduce.class); 

Map產生一對TextText,那麼你combiner需要它,併產生一對TextIntWritable,但你的Reducer能不接受IntWritable作爲值,因此它引發異常。嘗試刪除Combiner設置。

+0

好,我除去組合線,但仍然是相同的問題 – numanumu 2014-10-17 09:31:30

+0

輸入(鍵,值)對,輸出的數據類型的數據類型(鍵,值)對用於減速器類應該相等還是可以不同? – numanumu 2014-10-17 09:55:01

+0

它們可以不同。這個異常是否仍然出現在輸出中:'*** java.lang.Exception:java.io.IOException:錯誤的值類:class org.apache.hadoop.io.IntWritable不是類org.apache.hadoop.io。文字'? – 2014-10-17 11:17:52

0

使用主要從波紋管:

public static void main(String[] args) throws Exception { 
JobConf conf = new JobConf(artistandTrack.class); 
conf.setJobName("artisttrack"); 

FileInputFormat.setInputPaths(conf, new Path(args[0])); 
FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

conf.setMapperClass(Map.class); 
//conf.setCombinerClass(Reduce.class); 
conf.setReducerClass(Reduce.class); 

//conf.setOutputKeyClass(Text.class); 
//conf.setOutputValueClass(IntWritable.class); 

conf.setMapOutputKeyClass(Text.class); 
conf.setMapOutputValueClass(Text.class); 

conf.setInputFormat(TextInputFormat.class); 
conf.setOutputFormat(TextOutputFormat.class); 

JobClient.runJob(conf); 
} 
+0

嘗試過,但結果相同 – numanumu 2014-10-17 10:18:28

+0

最後的想法,刪除conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setCombinerClass(Reduce.class); – www 2014-10-17 11:15:44

+0

試圖沒有幫助任何想法爲什麼例外仍然存在 – numanumu 2014-10-17 13:49:10