該代碼用於將數據寫入到的DataNodes存在於2個文件:
DFSOutputStream.java
(包:org.apache.hadoop.hdfs
)
由客戶端寫入的數據被分成數據包(通常爲64K大小)。當數據包準備就緒時,數據將被排入數據隊列中,數據隊列由DataStreamer
拾取。
DataStreamer
(包:org.apache.hadoop.hdfs
)
它拿起在數據隊列中的分組,並將它們在管道發送到數據節點(典型地有3個數據節點,因爲複製因子在數據流水線, 3)。
它檢索一個新的塊ID並開始將數據流式傳輸到數據節點。當一個數據塊被寫入時,它關閉當前塊並獲得用於寫入下一組數據包的新塊。
的代碼,其中,將一個新塊得到的,是如下:
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
的代碼,其中,所述當前塊被關閉時,低於:
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
在endBlock()
方法中,再次舞臺設置爲:
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
這意味着,將創建一個新的管道用於寫入下一組pa一個新的塊。
編輯:如何檢測到塊的結束?
由於DataStreamer
不斷追加數據到一個塊,它會更新寫入的字節數。
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它也保持檢查,如果寫入的字節數等於塊大小:如果達到塊大小
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
在上面的語句中,以下條件檢查:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到塊邊界,則調用endBlock()
方法:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
這將確保當前塊被關閉,並從Name Node
獲得用於寫入數據的新塊。
塊的大小是由dfs.blocksize
參數hdfs-site.xml
文件決定(它被設置爲128 MB在我的羣集= 134217728):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>
真的很好的答案,但是如果if(one.isLastPacketInBlock()){}獲取最大塊大小的信息,if語句如何?代碼的哪一部分指示要分割爲128 MB的文件? – IFH
@Iris,我已經更新了答案。請檢查 –
完美答案!只需確認,調用enqueueCurrentPacketFull()的if語句;在DFSOutputStream.java中,對嗎? – IFH