2014-11-06 64 views
11

前提

我試圖找到正確的方式提前終止了一系列的Node.js管道流(管道)的:有時我要在流完成之前正常中止流。具體而言,我主要處理objectMode: true和非本地平行流,但這應該不重要。有道unpipe一個streams2管道和清空(不只是沖水)

問題

問題是,當我unpipe管道,數據保持在每個流的緩衝器,並且drain編對於大多數中間流(例如,Readable/Transform),這個可能是,但最後的Writable仍然排到其寫入目標(例如文件或數據庫或套接字或w/e)。如果緩衝區包含數百或數千個需要耗費大量時間的塊,這可能會產生問題。我希望它立即停止,即不排水;爲什麼浪費週期和內存對數據無關緊要?

根據我走的路線,我收到「寫完後」錯誤,或者當流找不到現有管道時發生異常。

問題

什麼是有道優雅地殺死流的形式a.pipe(b).pipe(c).pipe(z)管道?

解決方案?

我已經提出瞭解決的辦法是3步:

  1. unpipe管道中的每個流以相反的順序
  2. 實現Writable
  3. end每個實現Writable流空每個流的緩衝器

說明整個過程的一些僞代碼:

var pipeline = [ // define the pipeline 
    readStream, 
    transformStream0, 
    transformStream1, 
    writeStream 
]; 

// build and start the pipeline 
var tmpBuildStream; 
pipeline.forEach(function(stream) { 
    if (!tmpBuildStream) { 
     tmpBuildStream = stream; 
     continue; 
    } 
    tmpBuildStream = lastStream.pipe(stream); 
}); 

// sleep, timeout, event, etc... 

// tear down the pipeline 
var tmpTearStream; 
pipeline.slice(0).reverse().forEach(function(stream) { 
    if (!tmpTearStream) { 
     tmpTearStream = stream; 
     continue; 
    } 
    tmpTearStream = stream.unpipe(tmpTearStream); 
}); 

// empty and end the pipeline 
pipeline.forEach(function(stream) { 
    if (typeof stream._writableState === 'object') { // empty 
    stream._writableState.length -= stream._writableState.buffer.length; 
    stream._writableState.buffer = []; 
    } 
    if (typeof stream.end === 'function') { // kill 
    stream.end(); 
    } 
}); 

我真的很擔心的stream._writableState使用和修改內部bufferlength特性(_意味着私有財產)。這看起來像一個黑客。還請注意,由於我是管道系統,因此我們無法解決問題(基於我從IRC收到的建議),如pauseresume

我也把一個可運行的版本(相當草率),你可以從GitHub抓取:https://github.com/zamnuts/multipipe-proto(GIT克隆,NPM安裝,查看自述,NPM開始)

+0

我也有趣如何搶佔並停止對效率的巨大(多千兆字節)的數據流。 (例如,您只想讀取標題) – user949300 2015-03-11 21:24:03

+2

據我所知,沒有官方解決方案來清除寫入流。我能想到的唯一解決方案是編寫一個自定義轉換流,您可以在管道中的寫入流之前插入它。這個流將實現它自己的緩衝行爲,接管來自寫入流的責任。因爲我們擁有這個流的緩衝機制,所以我們可以構建一個方法來清除它,而不是訴諸黑客。寫入流應該得到一個非常低的highWaterMark,以便在我們終止時最小化要寫入的數據。 – 2015-03-17 19:47:45

+0

@JasperWoudenberg我認爲你在那裏。此外,自從編寫這個問題以來,IIRC就已經有解決這個緩衝怪癖的發佈。 – zamnuts 2015-03-18 18:08:20

回答

1

在這種特殊情況下,我覺得我們應該得到擺脫你有4個不同的不完全定製的流的結構。如果我們沒有實現我們自己的機制,將它們連接在一起將產生鏈條依賴性,這將很難控制。

我想專注於自己的實際工作中的目標位置:

INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT 
       |   |    |   | 
KILL_ALL------o----------o--------------o------------o--------[nothing to drain] 

我認爲,上述結構可以通過組合定製來實現:

  1. duplex stream - 爲自己的_write(chunk, encoding, cb)_read(bytes)

    執行
  2. transform stream - 爲自己的_transform(chunk, encoding, cb)執行。

由於您使用的是writable-stream-parallel包您可能還需要去在他們的庫,因爲他們的duplex實現可以在這裏找到:https://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js。 而他們的transform stream實現在這裏:https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js。他們在這裏處理highWaterMark

可能的解決方法

他們寫流:https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一個有趣的功能writeOrBuffer,我想你也許可以調整它有點中斷從緩存寫入數據。

注意:這些3 flags所控制的緩衝器清除:

(!finished && !state.bufferProcessing && state.buffer.length) 

參考文獻:

+0

1)一起管道流允許我有一個可插入的體系結構 - 管道可以創建,組織,並根據用戶可定義的過濾器/變換/等長度不同。 2)你對「自定義組合」部分不太清楚,你可以詳細說明還是改寫一下? 3)你提到的可能的解決方案並不比我建議的解決方案更好或更差 - 我仍然需要修補我不保留的代碼。 – zamnuts 2015-03-18 18:06:30

+0

嗯,我還沒有實現這個解決方案,但我想指出的是,您可以利用創建自己的流,它可以很好地控制「底層」中發生的事情,而不是黑客已經存在的解決方案。這是一個非常令人興奮的話題,當我翻閱可寫入流並行文檔時,我可以看到越來越多的控制實現的地方。關於管道:我同意可插拔拱門。是偉大的,但在你的情況下,我會建議建立自己的「插頭」。 – 2015-03-18 18:25:34