0

我實現了卡夫卡的消費者應用程序,我只是想知道,如果我在PM2集羣模式下運行這個程序,將所有的內核消耗相同的消息或不同的消息?有沒有一種方法可以驗證它?在集羣模式下運行此應用程序是否理想?我在集羣模式下運行這個原因是因爲我們的卡夫卡產生了大量的消息。卡夫卡的NodeJS與消費羣PM2

也是目前如果我在PM2集羣模式下運行,這所有的核心都達到了它的CPU使用率達到100%。它是否會像這樣發生?

FYI:我使用https://www.npmjs.com/package/no-kafka

+0

請檢查配置選項並按可用分區數拆分您的使用者。如果您的消費者正在閱讀特定主題的多個分區,則會收到重複的消息, –

回答

2

將全部核消耗相同的消息或不同的消息?有沒有一種方法可以驗證它?

這取決於你主題配置+消費結構。我們舉個例子吧。

  • 假設我們有一個包含3個分區的主題。
  • 現在我們開始1消費者流程與消費者羣體「some_consumer_group」。對於comsumer小組的細節看看這裏https://www.npmjs.com/package/no-kafka#groupconsumer-new-unified-consumer-api
  • 現在您的一位消費者正在傾聽3個分區。
  • 因爲卡夫卡維護每個主題的偏移量,每個分區對每個消費羣體消費的將獲得3個消息,從3個不同的分區。因此沒有消息的重複。
  • 現在讓我們再添加一個消費者流程。
  • 現在消費者組1的消費者組「some_consumer_group」正在監聽分區0和1,而消費者組2的消費者組「some_consumer_group」正在監聽分區2(它也可能會反過來)。
  • 最後,如果我們爲該組添加更多的消費者,現在我們讓每個消費者都聆聽1個分區
  • 如果這是設置,您將不會遇到重複的消息。

也是目前如果我在PM2集羣模式下運行,這所有的核心都達到了它的CPU使用率達到100%。它是否會像這樣發生?

我並不真正熟悉no-kafka以及消息如何處理。

但檢查時,庫等待提交是否獲取下一批消息之前發生。

如果不是有可能是一個機會,你的進程的信息創建過多的處理程序。

2

基於PM2的羣集僅適用於網絡服務器,因爲羣集進程共享傳入網絡端口並分發請求。

就你而言,數據源是消息訂閱,它必須手動分發到集羣的工作進程。

所以,爲了安全起見,主進程應與數據源的交互,並均勻地分配消息工作進程,以便在外部,它似乎是一個單一的消費者,但仍然可以處理所有的消息CPU核心。

下面的例子演示了這樣的設置,而不依賴於基於PM2集羣:

const cluster = require('cluster'); 
const _ = require('lodash'); 
const os = require('os'); 

// dispatch index 
let dispatchIndex = 0; 

/** 
* Dispatches data to workers in a cyclic fashion 
* @param {*} data - data to process 
*/ 
function dispatch(data) { 

    // ensure master 
    if (!cluster.isMaster) { 
     throw new Error('Only master can dispatch'); 
    } 

    // get worker ids, sorted 
    const workersIds = _.sortBy(_.keys(cluster.workers), _.identity); 

    // ensure at least one worker is available 
    if (workersIds.length < 1) { 
     throw new Error('No worker process alive'); 
    } 

    // select next worker 
    dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex; 
    const worker = cluster.workers[workersIds[dispatchIndex]]; 
    dispatchIndex++; 

    // send data to worker 
    worker.send(data); 
} 


// Main Script 
if (cluster.isMaster) { 

    // Setup master process 
    console.info(`Master ${process.pid} started.`); 

    // fork worker processes to match available CPUs 
    const numCpu = os.cpus().length; 
    for (let i = 0; i < numCpu; i++) { 
     cluster.fork(); 
    } 

    // *** Get/Subscribe data from external source and dispatch to workers *** 
    setInterval(() => dispatch({ a: 'value' }), 1000); 

} else if (cluster.isWorker) { 

    // Setup worker process 
    console.info(`Worker ${process.pid} started.`); 

    // *** handle dispatched data *** 
    process.on('message', (data) => { 
     console.info(`Data processed by ${process.pid}`); 
    }); 
} 

這也是很好的閱讀了cluster module documentation