2017-06-01 80 views
0

我爲卡夫卡客戶端和生產者創建了單一類,只創建一個對象。 我需要多次發佈相同的主題,而無需創建新的客戶端和生產者實例。 我發現producer.on('ready',fn(){})沒有使用相同的客戶端和生產者實例觸發,只有當我有新的客戶端和生產者對象時它纔會被觸發。就緒事件上的kafka節點沒有被觸發

在這裏,示例代碼:

Singleton類:

const kafka = require('kafka-node'); 
const logger = require('./../../../../applogger'); 
const kafkaConfig = require('./../../../../config/config'); 

function ConnectionProvider() { 
    let kafkaConnection = undefined; 
    let client = undefined; 

    this.getConnection =() => { 

     if (!this.kafkaConnection) { 
      logger.info("Creating new kafka connection ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
      this.kafkaConnection = new kafka.Producer(this.client); 
     } 
     return this.kafkaConnection; 
    }; 
    this.getClient =() => { 
     if (!this.client) { 
      logger.info("Creating new kafka Client ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
     } 
     return this.client; 

    } 
    process.on('SIGINT', function() { 
     logger.info("Going to terminate kafka connection...!"); 
     process.exit(0); 
    }); 
} 
module.exports = exports = new ConnectionProvider; 

主題發佈方法:

const kafkaClient = require('./../core/kafkaConnection'); 

    const publishToKafka = function(dataPayload, callback) { 
     logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
     let producer = kafkaClient.getConnection(); 

     producer.on('ready', function() { 
      let payloads = dataPayload; 
      producer.send(payloads, function(err, data) { 
       if (err) { 
        logger.error(
         'Error in publishing message to messaging pipeline ', err 
        ); 
        callback(err, null); 
        return; 
       } 

       logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 

       callback(null, data); 
       return; 
      }); 
     }); 

     producer.on('error', function(err) { 
      logger.error(
       'Error in publishing message to messaging pipeline ', err 
      ); 
      producer.close(); 
     }); 

    }; 

DataPayload是: 讓dataPayload = [{主題:某話題,消息:someMessage }]

我需要調用PublishToKafka方法mult很多次,但只想創建一個kafka客戶端和生產者實例。 但生產者沒有發佈主題,因爲producer.on('ready',function(){})在使用客戶端和生產者的同一對象時沒有被觸發。

在此先感謝。

回答

0

我通過在每次調用後關閉kafka生產者和客戶端實例解決了這個問題,因爲我需要多次發佈到kafka生產者,但默認情況下kafka zookeeper只允許60個最大連接(如果我們想要增加連接的價值)。所以這就是爲什麼爲單個卡夫卡實例創建單例類。

但是在創建kafka的單個實例後,其producer.on('ready')事件不會被觸發,因爲第二次當我們使用已經處於就緒狀態的kafka生產者的同一對象時。所以我們需要每一次發佈新的製作者實例。

const publishToKafka = function(topicName, dataPayload, callback) { 
    logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
    let client = new kafka.Client(kakfaConfig.ZOOKPER_HOST); 
    let producer = new kafka.Producer(client); 


    producer.on('ready', function() { 
     let payloads = dataPayload; 
     producer.send(payloads, function(err, data) { 
      if (err) { 
       logger.error(
        'Error in publishing message to messaging pipeline ', err 
       ); 
       callback(err, null); 
       return; 
      } 

      logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 
      producer.close(); 
      client.close(); 
      callback(null, data); 

      return; 
     }); 
    }); 

    producer.on('error', function(err) { 
     logger.error(
      'Error in publishing message to messaging pipeline ', err 
     ); 
     producer.close(); 
    }); 

}; 

無需爲單個對象創建單例類。