2017-02-14 67 views
0

我正在開發一個偵聽消息隊列(ActiveMQ)的node.js組件,並將收到的消息分批添加到redis(每批必須爲20個) 。Node.js應用程序偵聽消息隊列並將消息異步添加到redis

從ActiveMQ接收的消息數量爲每秒10個或更少時沒有問題。

我的問題是消息每4毫秒添加到隊列中。這會導致添加到批次的記錄數有時會超過每批20個。

const stompit = require('stompit'); 
var uuid = require('node-uuid'); 
var Redis = require('ioredis'); 

var redis = new Redis(); 
var pipeline = redis.pipeline(); 
var batchCounter = 0; 

stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) { 

client.subscribe({ destination: 'MyQueue' }, function(err2, msg) { 
    msg.readString('UTF-8', function(err3, body) { 

     if (batchCounter >= 20){ 
      pipeline.exec(function(err4, results) { 
       pipeline = redis.pipeline(); 
       batchCounter = 0; 
       console.log(results); 
      }); 
     } 

     batchCounter++; 
     pipeline.set(uuid.v1(), JSON.stringify(body)); 
     //client.disconnect(); 
     }); 
    }); 
    }); 

如何解決這個問題? 謝謝

回答

0

嘗試在調用.exec方法之前重置管道,我認爲這是一種異步方法。由於.exec將來會在某個時間點運行,因此增量和pipeline.set可以在它之前運行。

下面的代碼保存當前的管道和前.exec

if (batchCounter >= 20){ 
    let fullpipeline = pipeline; 
    pipeline = redis.pipeline(); 
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) { 
     console.log(err4, results); 
    }); 
} 

然後新的消息應該那麼只有附加到新的管道同步創建一個新文件。

+0

它仍然是可能的,你可以用多條管道會在同一時間Redis的結束。如果這也會導致問題,您可能需要一個fullpipelines隊列來處理。 – Matt