我實現了卡夫卡的消費者應用程序,我只是想知道,如果我在PM2集羣模式下運行這個程序,將所有的內核消耗相同的消息或不同的消息?有沒有一種方法可以驗證它?在集羣模式下運行此應用程序是否理想?我在集羣模式下運行這個原因是因爲我們的卡夫卡產生了大量的消息。卡夫卡的NodeJS與消費羣PM2
也是目前如果我在PM2集羣模式下運行,這所有的核心都達到了它的CPU使用率達到100%。它是否會像這樣發生?
FYI:我使用https://www.npmjs.com/package/no-kafka
我實現了卡夫卡的消費者應用程序,我只是想知道,如果我在PM2集羣模式下運行這個程序,將所有的內核消耗相同的消息或不同的消息?有沒有一種方法可以驗證它?在集羣模式下運行此應用程序是否理想?我在集羣模式下運行這個原因是因爲我們的卡夫卡產生了大量的消息。卡夫卡的NodeJS與消費羣PM2
也是目前如果我在PM2集羣模式下運行,這所有的核心都達到了它的CPU使用率達到100%。它是否會像這樣發生?
FYI:我使用https://www.npmjs.com/package/no-kafka
將全部核消耗相同的消息或不同的消息?有沒有一種方法可以驗證它?
這取決於你主題配置+消費結構。我們舉個例子吧。
也是目前如果我在PM2集羣模式下運行,這所有的核心都達到了它的CPU使用率達到100%。它是否會像這樣發生?
我並不真正熟悉no-kafka以及消息如何處理。
但檢查時,庫等待提交是否獲取下一批消息之前發生。
如果不是有可能是一個機會,你的進程的信息創建過多的處理程序。
基於PM2的羣集僅適用於網絡服務器,因爲羣集進程共享傳入網絡端口並分發請求。
就你而言,數據源是消息訂閱,它必須手動分發到集羣的工作進程。
所以,爲了安全起見,主進程應與數據源的交互,並均勻地分配消息工作進程,以便在外部,它似乎是一個單一的消費者,但仍然可以處理所有的消息CPU核心。
下面的例子演示了這樣的設置,而不依賴於基於PM2集羣:
const cluster = require('cluster');
const _ = require('lodash');
const os = require('os');
// dispatch index
let dispatchIndex = 0;
/**
* Dispatches data to workers in a cyclic fashion
* @param {*} data - data to process
*/
function dispatch(data) {
// ensure master
if (!cluster.isMaster) {
throw new Error('Only master can dispatch');
}
// get worker ids, sorted
const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);
// ensure at least one worker is available
if (workersIds.length < 1) {
throw new Error('No worker process alive');
}
// select next worker
dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
const worker = cluster.workers[workersIds[dispatchIndex]];
dispatchIndex++;
// send data to worker
worker.send(data);
}
// Main Script
if (cluster.isMaster) {
// Setup master process
console.info(`Master ${process.pid} started.`);
// fork worker processes to match available CPUs
const numCpu = os.cpus().length;
for (let i = 0; i < numCpu; i++) {
cluster.fork();
}
// *** Get/Subscribe data from external source and dispatch to workers ***
setInterval(() => dispatch({ a: 'value' }), 1000);
} else if (cluster.isWorker) {
// Setup worker process
console.info(`Worker ${process.pid} started.`);
// *** handle dispatched data ***
process.on('message', (data) => {
console.info(`Data processed by ${process.pid}`);
});
}
這也是很好的閱讀了cluster module documentation。
請檢查配置選項並按可用分區數拆分您的使用者。如果您的消費者正在閱讀特定主題的多個分區,則會收到重複的消息, –