2013-03-08 51 views
0

我需要處理一個9 GB的CSV文件。在MR期間,它必須進行一些分組併爲遺留系統生成特殊格式。如何實現產生大於最大堆的輸出值的Java MapReduce?

輸入文件看起來是這樣的:

AppId;Username;Other Fields like timestamps... 
app/10;Mr Foobar;... 
app/10;d0x;... 
app/10;Mr leet;... 
app/110;kr1s;... 
app/110;d0x;... 
... 

而且OUTPUTFILE很簡單這樣的:

app/10;3;Mr Foobar;d0x;Mr leet 
app/110;2;kr1s;d0x 
^  ^^^^^^^^^ 
\ AppId \   \ A list with all users playing the game 
     \ 
      \ Ammount of users 

爲了解決這個問題,我寫了返回AppId重點一個映射器而Username的值爲。有了這個映射階段運行良好。

問題發生在縮小階段。在那裏我會得到一個Iterator<Text> userIds,它可能包含一個帶有大量userIds(> 5.000.000)的List。

減速機來處理這看起來是這樣的:

public class UserToAppReducer extends Reducer<Text, Text, Text, UserSetWritable> { 
    final UserSetWritable userSet = new UserSetWritable(); 

    @Override 
    protected void reduce(final Text appId, final Iterable<Text> userIds, final Context context) throws IOException, InterruptedException { 
     this.userSet.clear(); 

     for (final Text userId : userIds) { 
      this.userSet.add(userId.toString()); 

     } 
     context.write(appId, this.userSet); 
    } 
} 

UserSetWritable是存儲用戶的列表,自定義寫。這是生成輸出所需的(key = appId,value =用戶名列表)。

這是當前UserSetWritable的樣子:

public class UserSetWritable implements Writable { 
    private final Set<String> userIds = new HashSet<String>(); 

    public void add(final String userId) { 
     this.userIds.add(userId); 
    } 

    @Override 
    public void write(final DataOutput out) throws IOException { 
     out.writeInt(this.userIds.size()); 

     for (final String userId : this.userIds) { 
      out.writeUTF(userId); 
     } 
    } 

    @Override 
    public void readFields(final DataInput in) throws IOException { 
     final int size = in.readInt(); 

     for (int i = 0; i < size; i++) { 
      this.userIds.add(readUTF); 
     } 

    } 

    @Override 
    public String toString() { 
     String result = ""; 
     for (final String userId : this.userIds) { 
      result += userId + "\t"; 
     } 

     result += this.userIds.size(); 
     return result; 
    } 

    public void clear() { 
     this.userIds.clear(); 
    } 

} 

有了這個approche我得到一個Java HeapOutOfMemory異常。

Error: Java heap space 
attempt_201303072200_0016_r_000002_0: WARN : mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
attempt_201303072200_0016_r_000002_0: WARN : org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id 
attempt_201303072200_0016_r_000002_0: WARN : org.apache.hadoop.conf.Configuration - slave.host.name is deprecated. Instead, use dfs.datanode.hostname 
attempt_201303072200_0016_r_000002_0: FATAL: org.apache.hadoop.mapred.Child - Error running child : java.lang.OutOfMemoryError: Java heap space 
attempt_201303072200_0016_r_000002_0: at java.util.Arrays.copyOfRange(Arrays.java:3209) 
attempt_201303072200_0016_r_000002_0: at java.lang.String.<init>(String.java:215) 
attempt_201303072200_0016_r_000002_0: at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:542) 
attempt_201303072200_0016_r_000002_0: at java.nio.CharBuffer.toString(CharBuffer.java:1157) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.io.Text.decode(Text.java:394) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.io.Text.decode(Text.java:371) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.io.Text.toString(Text.java:273) 
attempt_201303072200_0016_r_000002_0: at  com.myCompany.UserToAppReducer.reduce(UserToAppReducer.java:21) 
attempt_201303072200_0016_r_000002_0: at  com.myCompany.UserToAppReducer.reduce(UserToAppReducer.java:1) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.mapred.Child$4.run(Child.java:268) 
attempt_201303072200_0016_r_000002_0: at java.security.AccessController.doPrivileged(Native Method) 
attempt_201303072200_0016_r_000002_0: at javax.security.auth.Subject.doAs(Subject.java:396) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) 
attempt_201303072200_0016_r_000002_0: at org.apache.hadoop.mapred.Child.main(Child.java:262) 

UserToAppReducer.java:21是這一行:this.userSet.add(userId.toString());

在同一個集羣,我能與此豬腳本proccess數據:

set job.name convertForLegacy 
set default_parallel 4 
data = load '/data/...txt' 

using PigStorage(',') 
as (appid:chararray,uid:chararray,...); 
grp = group data by appid; 
counter = foreach grp generate group, data.uid, COUNT(data); 
store counter into '/output/....' using PigStorage(','); 

那麼如何解決這個OutOfMemoryException異常與MapReduce的?

+1

看起來你的Pig腳本實際上並沒有像你的Reducer那樣工作(計算用戶的數量而不是編譯一組唯一的用戶) - 你想要的行爲是什麼? – 2013-03-08 11:38:35

+0

哦,你是對的。我喜歡有一個獨特的設置 – d0x 2013-03-08 13:04:16

+0

你想要解決什麼問題:如何重複刪除用戶列表,或者如何在單行上安裝大量用戶列表?在第一種情況下,你應該考慮二次排序。 – Olaf 2013-03-08 14:07:11

回答

1

編寫了「大」值的類似問題:Handling large output values from reduce step in Hadoop

除了使用這個概念寫入了大紀錄(得到你想要的用戶10萬周的的CSV列表),你需要使用組合鍵(應用程序ID和用戶ID)和自定義分區程序來確保單個應用程序ID的所有鍵都可以通向縮減程序。

一些像this gist(未測試)。