2012-07-25 111 views
2

我試圖運行電子書Mahout中的chapter6(列表6.1〜6.4)中的推薦示例。有兩個映射器/減速器對。下面是代碼:正確使用SequenceFileInputFormat的鍵映射類型不匹配

映射器 - 1

public class WikipediaToItemPrefsMapper extends 
     Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> { 

私有靜態最終圖形號= Pattern.compile( 「(\ d +)」);

@Override 
public void map(LongWritable key, 
      Text value, 
      Context context) 
throws IOException, InterruptedException { 

    String line = value.toString(); 
    Matcher m = NUMBERS.matcher(line); 
    m.find(); 
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); 
    VarLongWritable itemID = new VarLongWritable(); 
    while (m.find()) { 
     itemID.set(Long.parseLong(m.group())); 
     context.write(userID, itemID); 
    } 
} 

}

減速器 - 1

public class WikipediaToUserVectorReducer extends 
     Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> { 
@Override 
public void reduce(VarLongWritable userID, 
        Iterable<VarLongWritable> itemPrefs, 
        Context context) 
    throws IOException, InterruptedException { 

     Vector userVector = new RandomAccessSparseVector(
     Integer.MAX_VALUE, 100); 
     for (VarLongWritable itemPref : itemPrefs) { 
      userVector.set((int)itemPref.get(), 1.0f); 
     } 

     //LongWritable userID_lw = new LongWritable(userID.get()); 
     context.write(userID, new VectorWritable(userVector)); 
     //context.write(userID_lw, new VectorWritable(userVector)); 
} 

}

的減速器輸出一個用戶ID和一個userVector和它看起來像這樣:98955 {590:1.0 22:1.0 9059 :1.0 3:1.0 2:1.0 1:1.0}提供的FileInputformat和TextInputFormat在驅動程序中使用。

我想使用另一對映射器減速器的用於處理此數據進一步:

映射器 - 2

public class UserVectorToCooccurenceMapper extends 
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> { 

@Override 
public void map(VarLongWritable userID, 
       VectorWritable userVector, 
       Context context) 
throws IOException, InterruptedException { 

    Iterator<Vector.Element> it = userVector.get().iterateNonZero(); 
    while (it.hasNext()) { 
     int index1 = it.next().index(); 
     Iterator<Vector.Element> it2 = userVector.get().iterateNonZero(); 
     while (it2.hasNext()) { 
      int index2 = it2.next().index(); 
       context.write(new IntWritable(index1), 
           new IntWritable(index2)); 
     } 
    } 
} 

}

減速器 - 2

公共類UserVectorToCooccurenceReducer延伸 減速機{

@Override 
public void reduce(IntWritable itemIndex1, 
      Iterable<IntWritable> itemIndex2s, 
      Context context) 
throws IOException, InterruptedException { 

    Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); 
    for (IntWritable intWritable : itemIndex2s) { 
     int itemIndex2 = intWritable.get(); 
     cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0); 
    } 
    context.write(itemIndex1, new VectorWritable(cooccurrenceRow)); 
} 

}

這是我使用的驅動程序:

public final class RecommenderJob extends Configured implements Tool { 

@覆蓋 公衆詮釋運行(字串[] args)拋出異常{

Job job_preferenceValues = new Job (getConf()); 
    job_preferenceValues.setJarByClass(RecommenderJob.class); 
    job_preferenceValues.setJobName("job_preferenceValues"); 

    job_preferenceValues.setInputFormatClass(TextInputFormat.class); 
    job_preferenceValues.setOutputFormatClass(SequenceFileOutputFormat.class); 

    FileInputFormat.setInputPaths(job_preferenceValues, new Path(args[0])); 
    SequenceFileOutputFormat.setOutputPath(job_preferenceValues, new Path(args[1])); 

    job_preferenceValues.setMapOutputKeyClass(VarLongWritable.class); 
    job_preferenceValues.setMapOutputValueClass(VarLongWritable.class); 

    job_preferenceValues.setOutputKeyClass(VarLongWritable.class); 
    job_preferenceValues.setOutputValueClass(VectorWritable.class); 

    job_preferenceValues.setMapperClass(WikipediaToItemPrefsMapper.class); 
    job_preferenceValues.setReducerClass(WikipediaToUserVectorReducer.class); 

    job_preferenceValues.waitForCompletion(true); 

    Job job_cooccurence = new Job (getConf()); 
    job_cooccurence.setJarByClass(RecommenderJob.class); 
    job_cooccurence.setJobName("job_cooccurence"); 

    job_cooccurence.setInputFormatClass(SequenceFileInputFormat.class); 
    job_cooccurence.setOutputFormatClass(TextOutputFormat.class); 

    SequenceFileInputFormat.setInputPaths(job_cooccurence, new Path(args[1])); 
    FileOutputFormat.setOutputPath(job_cooccurence, new Path(args[2])); 

    job_cooccurence.setMapOutputKeyClass(VarLongWritable.class); 
    job_cooccurence.setMapOutputValueClass(VectorWritable.class); 

    job_cooccurence.setOutputKeyClass(IntWritable.class); 
    job_cooccurence.setOutputValueClass(VectorWritable.class); 

    job_cooccurence.setMapperClass(UserVectorToCooccurenceMapper.class); 
    job_cooccurence.setReducerClass(UserVectorToCooccurenceReducer.class); 

    job_cooccurence.waitForCompletion(true); 

    return 0; 

}

public static void main(String[] args) throws Exception { 
ToolRunner.run(new Configuration(), new RecommenderJob(), args); 

} }

的錯誤,我得到的是:

java.io.IOException: Type mismatch in key from map: expected org.apache.mahout.math.VarLongWritable, received org.apache.hadoop.io.IntWritable 

在谷歌搜索的修復程序的過程中,我發現我的問題是類似this question。但不同的是,我已經使用SequenceFileInputFormat和SequenceFileOutputFormat,我相信是正確的。我也看到org.apache.mahout.cf.taste.hadoop.item.RecommenderJob或多或少有類似的東西。在我的理解& Yahoo Tutorial

SequenceFileOutputFormat迅速連載任意數據類型的文件;相應的SequenceFileInputFormat會將該文件反序列化爲相同的類型,並將數據以與之前的Reducer發出的相同的方式呈現給下一個Mapper。

我在做什麼錯?將真正體會到一些指針從別人。我花了一整天試圖解決這一問題,並得到無處:(

回答

2

你的第二個映射器具有以下特徵:

public class UserVectorToCooccurenceMapper extends 
     Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> 

但是你定義你的驅動程序代碼如下:

job_cooccurence.setMapOutputKeyClass(VarLongWritable.class); 
job_cooccurence.setMapOutputValueClass(VectorWritable.class); 

減速器期待<IntWritable, IntWritable>作爲輸入,所以你只要修改你的驅動程序代碼:

job_cooccurence.setMapOutputKeyClass(IntWritable.class); 
job_cooccurence.setMapOutputValueClass(IntWritable.class); 
+0

非常感謝。我覺得瘸子犯了錯誤:) – Alps 2012-07-26 15:20:31

+0

np,有時你只需要一雙新鮮的眼睛! – 2012-07-26 16:18:30

相關問題