2015-05-04 57 views
1

我的spark應用程序使用自定義hadoop輸入格式處理文件(平均大小爲20 MB)並將結果存儲在HDFS中。如何使用hadoop自定義輸入格式調整Spark應用程序

以下是代碼片段。

Configuration conf = new Configuration(); 


JavaPairRDD<Text, Text> baseRDD = ctx 
    .newAPIHadoopFile(input, CustomInputFormat.class,Text.class, Text.class, conf); 

JavaRDD<myClass> mapPartitionsRDD = baseRDD 
    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Text, Text>>, myClass>() { 
     //my logic goes here 
    } 

//few more translformations 
result.saveAsTextFile(path); 

該應用程序爲每個文件創建1個任務/分區,並處理相應的零件文件並將其存儲在HDFS中。

即,10000輸入文件被創建任務的萬和10000個的部分文件存儲在HDFS。

兩個mapPartitions和baseRDD地圖操作創建每個文件1個任務。

SO質疑 How to set the number of partitions for newAPIHadoopFile? 建議設置 conf.setInt("mapred.max.split.size", 4);配置不分區。

但是,當這個參數設置CPU利用最大,沒有舞臺,甚至很長一段時間之後,無法啓動。

如果我沒有設置此參數,那麼應用程序將成功完成,如上所述。

如何設置分區的數量與newAPIHadoopFile和提高效率?

mapred.max.split.size選項,會發生什麼?

============

更新: 與mapred.max.split.size選項,會發生什麼?

在我的使用情況下,文件尺寸小和改變分割大小選項是這裏無關緊要。這個SO

更多信息:Behavior of the parameter "mapred.min.split.size" in HDFS

+0

mapred.max.split.size指定以字節爲單位的大小,我認爲 –

回答

0

只需使用baseRDD.repartition(<a sane amount>).mapPartitions(...)。這會將結果操作移至更少的分區,尤其是在文件很小的情況下。

相關問題