2013-12-12 30 views
3

據我所知,分佈式緩存將文件複製到每個節點,然後映射或減少從本地文件系統讀取文件。如何使用Hadoop分佈式緩存將文件放入內存?

我的問題是:有沒有一種方法可以使用Hadoop分佈式緩存將我們的文件放入內存中,以便每個映射或reduce都可以直接從內存中讀取文件?

我的MapReduce程序爲每個節點分發一個約1M的png圖片,然後每個地圖任務從分佈式緩存中讀取圖片,並使用地圖輸入中的另一張圖片進行一些圖像處理。

回答

2
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URI; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 

public class WordCount { 

    public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{ 

    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(Object key, Text value, Context context 
        ) throws IOException, InterruptedException { 

      Path[] uris = DistributedCache.getLocalCacheFiles(context 
        .getConfiguration()); 





        try{ 
         BufferedReader readBuffer1 = new BufferedReader(new FileReader(uris[0].toString())); 
         String line; 
         while ((line=readBuffer1.readLine())!=null){ 
          System.out.println(line); 

         } 
         readBuffer1.close(); 
        }  
        catch (Exception e){ 
         System.out.println(e.toString()); 
        } 

        StringTokenizer itr = new StringTokenizer(value.toString()); 

     while (itr.hasMoreTokens()) { 
     word.set(itr.nextToken()); 
     context.write(word, one); 
     } 
    } 
    } 

    public static class IntSumReducer 
     extends Reducer<Text,IntWritable,Text,IntWritable> { 
    private IntWritable result = new IntWritable(); 

    public void reduce(Text key, Iterable<IntWritable> values, 
         Context context 
         ) throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : values) { 
     sum += val.get(); 
     } 
     int length=key.getLength(); 
     System.out.println("length"+length); 
     result.set(sum); 
/*  key.set("lenght"+lenght);*/ 
     context.write(key, result); 


    } 
    } 

    public static void main(String[] args) throws Exception { 

     final String NAME_NODE = "hdfs://localhost:9000"; 
    Configuration conf = new Configuration(); 

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: wordcount <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "word count"); 
    job.setJarByClass(WordCount.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 


    DistributedCache.addCacheFile(new URI(NAME_NODE 
     + "/dataset1.txt"), 
     job.getConfiguration()); 



    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 

} 
+0

謝謝。我知道如何使用分佈式緩存。我的問題是如何將文件放入內存而不是本地文件系統。在你的程序中,每個地圖都會從本地文件系統讀取dataset1.txt。看來Spark可以滿足我的需求。 – hequn8128

+0

在setup()中加載圖片。 – Malcolm

0

很好的問題。我也試圖解決類似的問題。我不認爲Hadoop支持開箱即用的內存緩存。但是,爲了達到這個目的,在網格的某個地方放置另一個內存緩存應該不是很困難。我們可以將緩存的位置和參數的名稱作爲作業配置的一部分。

就上面的代碼示例而言,它並不回答原始問題。此外,它還展示了非最佳代碼示例。理想情況下,您應該訪問緩存文件作爲setup()方法的一部分,並緩存您可能想要用作map()方法一部分的任何信息。在上面的例子中,每個鍵值對都會讀取緩存文件一次,這會降低mapreduce作業的性能。