2017-08-02 57 views
0

我有一些基於hdfs的基於thrift-compact的序列化格式文件作爲輸入。通過擴展hadoop的FileInputFormat,我們可以快速將文件加載到RDD結構中。將groupBy rdd結果保存回HDFS

現在,在應用一些groupBy轉換之後,輸出RDD變爲JavaPairRDD<Long, Iterable<thriftGeneratedClass>>。 現在,我想通過PairRDD中的密鑰將groupBy RDD結果保存到具有多個輸出文件的HDFS。 E.X:如果我們在PairRDD中有兩個鍵100,200,則會生成兩個文件100.thrift和200.thrift。每個文件都包含thrift類的所有可迭代列表。 代碼如下所示:

//Feature is some thrift generated class 
JavaRDD<Feature> featureJavaRDD = jsc.newAPIHadoopFile(inputPath, ThriftInputFormat.class, 
      NullWritable.class, Feature.class, jsc.hadoopConfiguration()).values(); 
JavaPairRDD<Long, Iterable<Feature>> groupByRDD = featureJavaRDD.groupBy(...) 
//how to save groupByRDD results back to HDFS with files by key 

我的問題是:什麼是實現這一目標的最佳方式是什麼?我知道答案可能涉及hadoop saveAsNewAPIHadoopFileMultipleOutputs

回答

0

幾天前我有一個類似的用例,我通過編寫兩個自定義類實現MultipleTextOutputFormatRecordWriter來解決它。

我的輸入是一個JavaPairRDD<String, List<String>>,我想將它存儲在一個文件名中的文件中,其中包含所有行。 (所以,這是幾乎相同的使用情況下)

這裏是我的MultipleTextOutputFormat實施

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> { 

    @Override 
    protected String generateFileNameForKeyValue(K key, V value, String name) { 
     return key.toString(); //The return will be used as file name 
    } 

    /** The following 4 functions are only for visibility purposes     
    (they are used in the class MyRecordWriter) **/ 
    protected String generateLeafFileName(String name) { 
     return super.generateLeafFileName(name); 
    } 

    protected V generateActualValue(K key, V value) { 
     return super.generateActualValue(key, value); 
    } 

    protected String getInputFileBasedOutputFileName(JobConf job,  String name) { 
     return super.getInputFileBasedOutputFileName(job, name); 
     } 

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { 
     return super.getBaseRecordWriter(fs, job, name, arg3); 
    } 

    /** Use my custom RecordWriter **/ 
    @Override 
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { 
    final String myName = this.generateLeafFileName(name); 
     return new MyRecordWriter<K, V>(this, fs, job, arg3, myName); 
    } 
} 

下面的代碼是我RecordWriter執行代碼。

class MyRecordWriter<K, V> implements RecordWriter<K, V> { 

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat; 
    private final FileSystem fs; 
    private final JobConf job; 
    private final Progressable arg3; 
    private String myName; 

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap(); 

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { 
     this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; 
     this.fs = fs; 
     this.job = job; 
     this.arg3 = arg3; 
     this.myName = myName; 
    } 

    @Override 
    void write(K key, V value) throws IOException { 
     String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); 
     String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); 
     Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); 
     RecordWriter rw = this.recordWriters.get(finalPath); 
     if(rw == null) { 
      rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); 
      this.recordWriters.put(finalPath, rw); 
     } 
     List<String> lines = (List<String>) actualValue; 
     for (String line : lines) { 
      rw.write(null, line); 
     } 
    } 

    @Override 
    void close(Reporter reporter) throws IOException { 
     Iterator keys = this.recordWriters.keySet().iterator(); 

     while(keys.hasNext()) { 
      RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); 
      rw.close(reporter); 
     } 

     this.recordWriters.clear(); 
    } 
} 

大部分代碼與FileOutputFormat完全相同。唯一的區別是那幾行

List<String> lines = (List<String>) actualValue; 
for (String line : lines) { 
    rw.write(null, line); 
} 

這些行讓我寫我輸入List<String>的每一行上的文件。 write函數的第一個參數設置爲null,以避免在每行上寫入密鑰。

要完成,我只需要做這個調用寫我的文件

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);