據我所知,分佈式緩存將文件複製到每個節點,然後映射或減少從本地文件系統讀取文件。如何使用Hadoop分佈式緩存將文件放入內存?
我的問題是:有沒有一種方法可以使用Hadoop分佈式緩存將我們的文件放入內存中,以便每個映射或reduce都可以直接從內存中讀取文件?
我的MapReduce程序爲每個節點分發一個約1M的png圖片,然後每個地圖任務從分佈式緩存中讀取圖片,並使用地圖輸入中的另一張圖片進行一些圖像處理。
據我所知,分佈式緩存將文件複製到每個節點,然後映射或減少從本地文件系統讀取文件。如何使用Hadoop分佈式緩存將文件放入內存?
我的問題是:有沒有一種方法可以使用Hadoop分佈式緩存將我們的文件放入內存中,以便每個映射或reduce都可以直接從內存中讀取文件?
我的MapReduce程序爲每個節點分發一個約1M的png圖片,然後每個地圖任務從分佈式緩存中讀取圖片,並使用地圖輸入中的另一張圖片進行一些圖像處理。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
Path[] uris = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
try{
BufferedReader readBuffer1 = new BufferedReader(new FileReader(uris[0].toString()));
String line;
while ((line=readBuffer1.readLine())!=null){
System.out.println(line);
}
readBuffer1.close();
}
catch (Exception e){
System.out.println(e.toString());
}
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
int length=key.getLength();
System.out.println("length"+length);
result.set(sum);
/* key.set("lenght"+lenght);*/
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
final String NAME_NODE = "hdfs://localhost:9000";
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
DistributedCache.addCacheFile(new URI(NAME_NODE
+ "/dataset1.txt"),
job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
很好的問題。我也試圖解決類似的問題。我不認爲Hadoop支持開箱即用的內存緩存。但是,爲了達到這個目的,在網格的某個地方放置另一個內存緩存應該不是很困難。我們可以將緩存的位置和參數的名稱作爲作業配置的一部分。
就上面的代碼示例而言,它並不回答原始問題。此外,它還展示了非最佳代碼示例。理想情況下,您應該訪問緩存文件作爲setup()方法的一部分,並緩存您可能想要用作map()方法一部分的任何信息。在上面的例子中,每個鍵值對都會讀取緩存文件一次,這會降低mapreduce作業的性能。
謝謝。我知道如何使用分佈式緩存。我的問題是如何將文件放入內存而不是本地文件系統。在你的程序中,每個地圖都會從本地文件系統讀取dataset1.txt。看來Spark可以滿足我的需求。 – hequn8128
在setup()中加載圖片。 – Malcolm