2016-01-12 14 views
1

正如我們都知道當本地文本文件被複制到HDFS時,文件被分割成固定大小的128 MB。例如,當我將一個256 MB的文本文件複製到HDFS中時,將會有2個包含「分割」文件的塊(256/128)。Hadoop HDFS文件分成哪些Java文件塊

有人可以告訴我中的哪個java/jar文件Hadoop 2.7.1源代碼具有將文件拆分成塊以及哪些java/jar文件將塊寫入數據節點的目錄的功能。

幫我看看這段代碼。

我只找到了他們在FileInputFormat.java中找到的塊進行邏輯輸入拆分,而這不是我所需要的。我需要分割物理文件的java文件。

回答

1

該代碼用於將數據寫入到的DataNodes存在於2個文件:

  • DFSOutputStream.java(包:org.apache.hadoop.hdfs

    由客戶端寫入的數據被分成數據包(通常爲64K大小)。當數據包準備就緒時,數據將被排入數據隊列中,數據隊列由DataStreamer拾取。

  • DataStreamer(包:org.apache.hadoop.hdfs

    它拿起在數據隊列中的分組,並將它們在管道發送到數據節點(典型地有3個數據節點,因爲複製因子在數據流水線, 3)。

    它檢索一個新的塊ID並開始將數據流式傳輸到數據節點。當一個數據塊被寫入時,它關閉當前塊並獲得用於寫入下一組數據包的新塊。

    的代碼,其中,將一個新塊得到的,是如下:

    // get new block from namenode. 
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { 
        if(LOG.isDebugEnabled()) { 
        LOG.debug("Allocating new block"); 
        } 
        setPipeline(nextBlockOutputStream()); 
        initDataStreaming(); 
    } 
    

    的代碼,其中,所述當前塊被關閉時,低於:

    // Is this block full? 
    if (one.isLastPacketInBlock()) { 
        // wait for the close packet has been acked 
        synchronized (dataQueue) { 
        while (!shouldStop() && ackQueue.size() != 0) { 
         dataQueue.wait(1000);// wait for acks to arrive from datanodes 
        } 
        } 
        if (shouldStop()) { 
        continue; 
        } 
    
        endBlock(); 
    } 
    

    endBlock()方法中,再次舞臺設置爲:

    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; 
    

    這意味着,將創建一個新的管道用於寫入下一組pa一個新的塊。

編輯:如何檢測到塊的結束?

由於DataStreamer不斷追加數據到一個塊,它會更新寫入的字節數。

/** 
    * increase bytes of current block by len. 
    * 
    * @param len how many bytes to increase to current block 
    */ 
void incBytesCurBlock(long len) { 
    this.bytesCurBlock += len; 
} 

它也保持檢查,如果寫入的字節數等於塊大小:如果達到塊大小

// If packet is full, enqueue it for transmission 
// 
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || 
    getStreamer().getBytesCurBlock() == blockSize) { 
    enqueueCurrentPacketFull(); 
} 

在上面的語句中,以下條件檢查:

getStreamer().getBytesCurBlock() == blockSize) 

如果遇到塊邊界,則調用endBlock()方法:

/** 
* if encountering a block boundary, send an empty packet to 
* indicate the end of block and reset bytesCurBlock. 
* 
* @throws IOException 
*/ 
protected void endBlock() throws IOException { 
    if (getStreamer().getBytesCurBlock() == blockSize) { 
     setCurrentPacketToEmpty(); 
     enqueueCurrentPacket(); 
     getStreamer().setBytesCurBlock(0); 
     lastFlushOffset = 0; 
    } 
} 

這將確保當前塊被關閉,並從Name Node獲得用於寫入數據的新塊。

塊的大小是由dfs.blocksize參數hdfs-site.xml文件決定(它被設置爲128 MB在我的羣集= 134217728):

<property> 
    <name>dfs.blocksize</name> 
    <value>134217728</value> 
    <description>The default block size for new files, in bytes. 
     You can use the following suffix (case insensitive): k(kilo), 
     m(mega), g(giga), t(tera), p(peta), e(exa) to specify the 
     size (such as 128k, 512m, 1g, etc.), Or provide complete size 
     in bytes (such as 134217728 for 128 MB). 
    </description> 
</property> 
+0

真的很好的答案,但是如果if(one.isLastPacketInBlock()){}獲取最大塊大小的信息,if語句如何?代碼的哪一部分指示要分割爲128 MB的文件? – IFH

+0

@Iris,我已經更新了答案。請檢查 –

+0

完美答案!只需確認,調用enqueueCurrentPacketFull()的if語句;在DFSOutputStream.java中,對嗎? – IFH

0

這不是一個jar文件或java文件,它具有分割文件的功能。這是執行此任務的客戶端守護程序。當你從本地加載文件時,客戶端首先只讀128MB,它通過詢問namenode找到一個存儲它的地方,並且它確保文件被正確地複製和複製。在這個階段,客戶端不會知道文件的實際大小,除非它將以相同的方式讀取所有的塊。

當您要存儲文件時,您提到的FileInputFormat.java不會被hdfs使用。它在您想要在該文件上運行任何mapreduce任務時使用。它與文件的存儲無關。

+0

感謝您的回答!但是肯定不應該在「客戶端守護進程任務」中聲明至少有一個if語句,該語句會不斷從文件讀取數據到塊中,直到文件達到最大大小(128 MB) 。 – IFH