2012-04-28 56 views
4

我是新來的Hadoop和我已經成功只是爲了運行例子的wordCount:http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.htmlJava Hadoop:如何創建將輸入文件作爲輸入文件並輸出每個文件中行數的輸出?

假設我們有有3個文件的文件夾。我想爲每個文件都有一個映射器,這個映射器只會計算行數並將其返回給reducer。

然後,縮減器會將每個映射器的行數作爲輸入,並將所有3個文件中存在的行的總數作爲輸出。

因此,如果我們有以下3個文件

input1.txt 
input2.txt 
input3.txt 

和映射器返回:

mapper1 -> [input1.txt, 3] 
mapper2 -> [input2.txt, 4] 
mapper3 -> [input3.txt, 9] 

減速會給

3+4+9 = 16 

輸出我已經做到了這一點一個簡單的Java應用程序,所以我想在Hadoop中完成它。我只有一臺電腦,並希望嘗試在僞分佈式環境中運行。

我該如何做到這一點?我應該做什麼適當的步驟?

我的代碼應該看起來像apache的例子嗎?我將有兩個靜態類,一個用於縮放器的mapper?或者我應該有3班,每個映射器一個?

如果你可以請指導我通過這個,我不知道如何做到這一點,我相信如果我設法編寫一些代碼來做到這一點,那麼我將來可以編寫更復雜的應用程序。

謝謝!

回答

5

我注意到你使用0.18版本的文檔。 Here's a link to 1.0.2(最新)。

第一個建議 - 使用IDE(日食,IDEA等)。它會幫助填補空白。

在實際的HDFS中,您無法知道每個文件所在的位置(不同的機器和羣集)。沒有保證行X甚至會駐留在與Y行相同的磁盤上。也不能保證行X不會在不同的計算機上分割(HDFS以塊形式分發數據,通常每個分區爲64Mb)。 這意味着你不能假定相同的映射器將處理整個文件。你可以確定的是,每個文件都由相同的reducer處理。

由於從映射器發送的每個鍵的縮減器都是唯一的,所以我會這樣做的方式是使用文件名作爲映射器中的輸出鍵。此外,映射器的默認輸入類是TextInputFormat,這意味着每個映射器將自行接收一整行(由LF或CR終止)。然後你可以從你的映射器中發出文件名和數字1(或者其他,與計算無關)。然後,在減速,您只需使用一個循環計數的文件名了多少次收到:映射器中的地圖功能

public static class Map extends Mapper<IntWritable, Text, Text, Text> { 

    public void map(IntWritable key, Text value, Context context) { 
    // get the filename 
    InputSplit split = context.getInputSplit(); 
    String fileName = split.getPath().getName(); 

    // send the filename to the reducer, the value 
    // has no meaning (I just put "1" to have something) 
    context.write(new Text(fileName), new Text("1")); 
    } 

} 

在減速的減少功能

public static class Reduce extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text fileName, Iterator<Text> values, Context context) { 
    long rowcount = 0; 

    // values get one entry for each row, so the actual value doesn't matter 
    // (you can also get the size, I'm just lazy here) 
    for (Text val : values) { 
     rowCount += 1; 
    } 

    // fileName is the Text key received (no need to create a new object) 
    context.write(fileName, new Text(String.valueOf(rowCount))); 
    } 

} 

在驅動程序/主

您幾乎可以使用與wordcount示例相同的驅動程序 - 請注意,我使用了新的mapreduce API,因此您需要調整一些內容(Job而不是JobConf等)。 This was really helpful當我讀到它。

請注意,您的MR輸出將只是每個文件名,併爲它的行數:

input1.txt 3 
input2.txt 4 
input3.txt 9 

如果你只是想指望在所有文件中的行總數,只需發出的所有相同的密鑰映射器(不是文件名)。這樣,將只有一個減速器來處理所有的行計數:

// no need for filename 
context.write(new Text("blah"), new Text("1")); 

你也可以連接該會處理每個文件的行數的輸出,或做其他花哨的東西一份工作 - 這是最高您。

我留下了一些樣板代碼,但基本知識在那裏。請務必檢查我,因爲我從記憶中輸入了大部分內容.. :)

希望這有助於!

10

除了sa125的回答之外,您可以通過不爲每個輸入記錄發出記錄而大大提高性能,而只需在映射器中累積計數器,然後在映射器清理方法中發出文件名並計數價值:

public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 
    protected long lines = 0; 

    @Override 
    protected void cleanup(Context context) throws IOException, 
      InterruptedException { 
     FileSplit split = (FileSplit) context.getInputSplit(); 
     String filename = split.getPath().toString(); 

     context.write(new Text(filename), new LongWritable(lines)); 
    } 

    @Override 
    protected void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     lines++; 
    } 
}