我注意到你使用0.18版本的文檔。 Here's a link to 1.0.2(最新)。
第一個建議 - 使用IDE(日食,IDEA等)。它會幫助填補空白。
在實際的HDFS中,您無法知道每個文件所在的位置(不同的機器和羣集)。沒有保證行X甚至會駐留在與Y行相同的磁盤上。也不能保證行X不會在不同的計算機上分割(HDFS以塊形式分發數據,通常每個分區爲64Mb)。 這意味着你不能假定相同的映射器將處理整個文件。你可以確定的是,每個文件都由相同的reducer處理。
由於從映射器發送的每個鍵的縮減器都是唯一的,所以我會這樣做的方式是使用文件名作爲映射器中的輸出鍵。此外,映射器的默認輸入類是TextInputFormat
,這意味着每個映射器將自行接收一整行(由LF或CR終止)。然後你可以從你的映射器中發出文件名和數字1(或者其他,與計算無關)。然後,在減速,您只需使用一個循環計數的文件名了多少次收到:映射器中的地圖功能
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write(new Text(fileName), new Text("1"));
}
}
在減速的減少功能
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write(fileName, new Text(String.valueOf(rowCount)));
}
}
在驅動程序/主
您幾乎可以使用與wordcount示例相同的驅動程序 - 請注意,我使用了新的mapreduce API,因此您需要調整一些內容(Job
而不是JobConf
等)。 This was really helpful當我讀到它。
請注意,您的MR輸出將只是每個文件名,併爲它的行數:
input1.txt 3
input2.txt 4
input3.txt 9
如果你只是想指望在所有文件中的行總數,只需發出的所有相同的密鑰映射器(不是文件名)。這樣,將只有一個減速器來處理所有的行計數:
// no need for filename
context.write(new Text("blah"), new Text("1"));
你也可以連接該會處理每個文件的行數的輸出,或做其他花哨的東西一份工作 - 這是最高您。
我留下了一些樣板代碼,但基本知識在那裏。請務必檢查我,因爲我從記憶中輸入了大部分內容.. :)
希望這有助於!