我正在使用kafka節點來使用來自特定Kafka主題的消息。當我重新啓動我的節點服務器時,它會按預期啓動我的使用者,但默認行爲是從偏移量0開始消費,而我的目標是僅接收新消息(即從當前偏移量開始消耗)。我沒有找到從API文檔中實現這一點的方法。任何人都知道它的支持?kafka-node開始從上一次偏移消耗
謝謝!
我正在使用kafka節點來使用來自特定Kafka主題的消息。當我重新啓動我的節點服務器時,它會按預期啓動我的使用者,但默認行爲是從偏移量0開始消費,而我的目標是僅接收新消息(即從當前偏移量開始消耗)。我沒有找到從API文檔中實現這一點的方法。任何人都知道它的支持?kafka-node開始從上一次偏移消耗
謝謝!
我問卡夫卡節點github上的問題這個問題(link )並得到了答案。它現在可用(從v0.4.0開始)。以下片段適用於我:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
乾杯!
如果你想只接收新郵件,您可以創建消費者實例之前設置以下屬性: auto.offset.reset =最新
我應該怎麼做? – ItayB
/*在創建KafkaConsumer實例之前,您必須設置屬性。 */ props.setProperty(「auto.offset.reset」,「latest」);/*最早,最新*/ KafkaConsumer , ?> consumer = new KafkaConsumer <>(props); – Hussain
你確定你在談論JavaScript(node js)API嗎?看起來像C++ – ItayB