2016-01-21 61 views
2

節點v.4.2.3和貓鼬v.4.3.6Mongoose QueryStream.pause()不暫停?

我必須迭代一個大型(> 10k文檔)集合,並處理每個文檔。

閱讀關於如何處理這種迭代的文檔,我偶然發現了QueryStream,我認爲這會解決我所有的問題。

function progress(total, t, current) { 
    process.stdout.clearLine(); // clear current text 
    process.stdout.write(Math.round(t/total * 100) + '% ' + t + '/' + total + ' ' + current); 
    process.stdout.cursorTo(0); 
} 

function loadBalance(current, stream) { 
    if(!stream.paused && current > 50) { 
     log('DEBUG', 'loadBalance', 'pause'); 
     stream.pause(); 
    } else if (stream.paused && current < 10) { 
     log('DEBUG', 'loadBalance', 'resume'); 
     stream.resume(); 
    } 
} 

var total = 0, 
    error = 0, 
    goods = 0, 
    current = 0; 

stream = Raw.find().stream(); 
stream.on('data', function (doc) { 
    heavyProcess(doc, function (err, refined) { 
     current = current + 1; 
     loadBalance(current, stream); 
     printP(total, goods + error, current); 
     if(err) { 
      error = error + 1; 
      current = current - 1; 
      loadBalance(current, stream); 
     } else { 
      new Pure(refined).save(function (err) { 
       if(err) { 
        error = error + 1; 
        current = current - 1; 
        loadBalance(current, stream); 
       } else { 
        goods = goods + 1; 
        current = current - 1; 
        loadBalance(current, stream); 
       } 
      }); 
     } 
    }); 
}).on('error', function (err) { 
    log('ERROR', 'stream', err); 
}).on('close', function() { 
    log('INFO', 'end', goods + '/' + total + ' (' + (goods/total*100) + '%) OK_'); 
    log('INFO', 'end', error + '/' + total + ' (' + (error/total*100) + '%) NOK'); 
    log('INFO', 'end', (total - goods - error) + ' missing'); 
}); 

LOADBALANCE不會被調用,並打印在暫停流,但'data'事件繼續被解僱,甚至想過stream.paused返回true。

我誤解了pause()是做什麼的?還是我濫用QueryStream?

回答

0

所以,真正的問題是不是在我發佈的代碼,但在模型生成。

我用一個新的連接,連接生吧,開始太鏈接純淨,但在最後時刻它連接到默認的貓鼬連接:

db = mongoose.createConnection('mongodb://127.0.0.1/SNCF');   //Creer la connexion a mongodb 

db.on('error', console.error.bind(console, 'connection error:')); 
db.once('open', function() {          //Une fois connecte 

    raw = new mongoose.Schema(
     { 
      //... 
     }, 
     { 
      strict: true, 
      collection: 'Raw' 
     } 
    ); 

    Raw = db.model('Raw', raw, 'Raw'); //<--- OK 

    pure = new mongoose.Schema(
     { 
      //... 
     }, 
     { 
      strict: true, 
      collection: 'Pure' 
     } 
    ); 

    Pure = mongoose.model('Pure', pure, 'Pure'); //<-- ERROR 
}); 

所以沒有純粹的文檔進行保存,內存爆炸時CPU正常工作。

將錯誤的行更改爲Pure = db.model('Pure', pure, 'Pure');解決了問題,我甚至不必暫停流。

2

貓鼬查詢流是v1流。提到在doc爲節點0.8 ReadStream(http://mongoosejs.com/docs/api.html#querystream_QueryStream

這意味着暫停事件是「諮詢」 https://nodejs.org/api/stream.html#stream_compatibility_with_older_node_js_versions

諮詢在這裏意味着調用暫停後,一些數據事件仍然會泄漏槽。
這與底層流緩存有關,並且是正確的流v1行爲。
您將不得不使用調用暫停後產生的任何數據事件。 這種行爲肯定不是從開發商的角度最佳,這就是爲什麼它在流v2進行了改變(​​)

這裏是與V2查詢流的mongoogejs的問題,我不認爲有實施任何計劃v2查詢流很快。
https://github.com/Automattic/mongoose/issues/1907

報價表的問題,這可能是一種解決您的問題:

var readStream = (new stream.Readable({ objectMode: true })).wrap(Model.find({}).stream());