2011-04-18 52 views
71

在MapReduce的每個任務減少它的輸出寫入到一個命名爲部分-R-NNNNN文件其中NNNNN是與減少任務相關聯的分區ID。 map/reduce合併這些文件嗎?如果是,如何?合併的輸出文件之後減少相

回答

110

而不是做文件合併你自己的,你可以通過調用委派減少輸出文件的整個合併:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt 

注意這結合了HDFS文件本地。請確保您有足夠的磁盤空間運行

+16

有沒有辦法做到這一點,但在dfs上?我的意思是我想將它們合併到dfs上的單個文件中? – humanzz 2012-05-22 18:18:01

+1

這適用於dfs。這是正確的安裝程序 – slayton 2013-05-24 01:41:15

+10

它似乎不適用於dfs,合併的文件被寫入本地文件系統。當然,你可以把它寫回來,但看起來很浪費。 – 2014-08-14 12:31:47

3

您可以運行其他map/reduce任務,其中map和reduce不更改數據,partitioner將所有數據分配給單個reducer。

+7

瘋狂的矯枉過正,而-getmerge命令是存在的。 – marcorossi 2013-06-10 08:51:57

+1

如果您需要合併比本地計算機可以處理的更多數據,則不需要 – Havnar 2016-05-27 07:01:29

25

不,這些文件不會被Hadoop合併。您獲得的文件數量與減少任務的數量相同。

如果你需要這個作爲下一份工作的輸入,那麼不要擔心分開的文件。只需指定整個目錄作爲下一個作業的輸入。

如果您確實需要羣集外的數據,那麼通常我會在將數據從羣集中拉出時將它們合併到接收端。

I.e.是這樣的:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt 
+0

感謝您在map/reduce(* mapred-default.xml *)配置文件中的答案buf,這裏有一個名爲* io.sort.factor *的屬性,它用於什麼? – Shahryar 2011-04-19 07:06:05

+2

io.sort.factor與地圖和縮小步驟之間的處理有關。不是減少的輸出。 – 2011-04-19 07:29:37

+0

你怎麼知道part-r- *文件將被合併的順序是正確的? – Razvan 2016-04-28 09:55:38

0

之前爲什麼不使用豬這樣的腳本一個用於合併分區中的文件:

stuff = load "/path/to/dir/*" 

store stuff into "/path/to/mergedir" 
8

這是你可以用它來合併文件在HDFS

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException { 
    FileSystem fs = FileSystem.get(config); 
    Path srcPath = new Path(src); 
    Path dstPath = new Path(dest); 

    // Check if the path already exists 
    if (!(fs.exists(srcPath))) { 
     logger.info("Path " + src + " does not exists!"); 
     return false; 
    } 

    if (!(fs.exists(dstPath))) { 
     logger.info("Path " + dest + " does not exists!"); 
     return false; 
    } 
    return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null); 
} 
功能
6

只能用於文本文件和HDFS的源和目標,使用以下命令:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

這將連接input_hdfs_dir中的所有文件,並將輸出回寫到HDFS output_hdfs_file。請記住,所有數據都將被帶回到本地系統,然後再次上傳到hdfs,儘管沒有創建臨時文件,並且這使用UNIX pe進行。

此外,這不會與非文本文件,如Avro中,ORC等

對於二進制文件,你可以做這樣的事情工作(如果你有映射到目錄配置單元表):

insert overwrite table tbl select * from tbl

根據您的配置,這也可能創造出比文件的更多。要創建單個文件,請使用mapreduce.job.reduces=1明確設置reducer的數量爲1,或將hive屬性設置爲hive.merge.mapredfiles=true

+0

有了這個解決方案,也知道從stdin進入最終目標的可能輸入。也就是說,當遇到啓用HA的羣集時,當遇到其中一個節點處於待機模式時,會出現警告消息。在那種情況下,我的輸出包含了另外的無害的警告信息。 [鏈接](https://s.apache.org/sbnn-error) – kasur 2016-10-20 21:47:29

3

part-r-nnnnn文件在由'r'指定的縮小階段之後生成。現在事實是,如果你有一個reducer運行,你將有一個輸出文件,如part-r-00000。如果reducer的數量是2,那麼你將有part-r-00000和part-r-00001等等。看起來,如果輸出文件太大而無法放入機器內存,因爲hadoop框架已設計爲在商品機器上運行,則文件將被分割。根據MRv1,您有20個減速器的限制來處理您的邏輯。您可能需要在配置文件mapred-site.xml中定製更多但相同的需求。 談論你的問題;可以選擇使用getmerge或者您可以通過嵌入下面的語句來驅動代碼

job.setNumReduceTasks(1); 

希望這回答你的問題減速器的數量設置爲1。

1

除了我之前的回答,我還有一個答案是你在幾分鐘前嘗試的。 您可以使用CustomOutputFormat,它看起來像下面

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> { 

    @Override 
    public RecordWriter<StudentKey,PassValue> getRecordWriter(
      TaskAttemptContext tac) throws IOException, InterruptedException { 
     //step 1: GET THE CURRENT PATH 
     Path currPath=FileOutputFormat.getOutputPath(tac); 

     //Create the full path 
     Path fullPath=new Path(currPath,"Aniruddha.txt"); 

     //create the file in the file system 
     FileSystem fs=currPath.getFileSystem(tac.getConfiguration()); 
     FSDataOutputStream fileOut=fs.create(fullPath,tac); 
     return new VictorRecordWriter(fileOut); 
    } 

} 

剛剛給出的代碼,看看從去年第四行。我用我自己的名字作爲輸出文件名,我用15個減速器測試了這個程序。仍然文件保持不變。因此,獲得單個輸出文件而不是兩個或更多輸出文件的大小可能非常明顯,輸出文件的大小不能超過主存儲器的大小,即輸出文件必須適合商品機器的內存,否則可能存在輸出文件拆分的問題。 謝謝!

+0

getmerge可以解決你的目的,但這是一種選擇。但這很有用 – 2015-10-27 10:22:37

0

如果文件有報頭,你可以這樣做擺脫它的:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv 

然後手動添加標題爲output.csv

0

。 map/reduce合併這些文件嗎?

不,它不合並。

您可以使用IdentityReducer來實現您的目標。

不執行縮減,將所有輸入值直接寫入輸出。

public void reduce(K key, 
        Iterator<V> values, 
        OutputCollector<K,V> output, 
        Reporter reporter) 
      throws IOException 

直接寫入所有的鍵和值來輸出。

看一看相關SE帖子:

hadoop: difference between 0 reducer and identity reducer?