2016-06-21 99 views
2

我正在使用kafka節點來使用來自特定Kafka主題的消息。當我重新啓動我的節點服務器時,它會按預期啓動我的使用者,但默認行爲是從偏移量0開始消費,而我的目標是僅接收新消息(即從當前偏移量開始消耗)。我沒有找到從API文檔中實現這一點的方法。任何人都知道它的支持?kafka-node開始從上一次偏移消耗

謝謝!

回答

4

我問卡夫卡節點github上的問題這個問題(link )並得到了答案。它現在可用(從v0.4.0開始)。以下片段適用於我:

consumerClient = new kafka.Client('localhost:2181'); 

/* Print latest offset. */ 
var offset = new kafka.Offset(consumerClient); 

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) { 
     var latestOffset = data['myTopic']['0'][0]; 
     console.log("Consumer current offset: " + latestOffset); 
}); 

var consumer = new kafka.HighLevelConsumer(
     consumerClient, 
     [ 
      { topic: 'myTopic', partition: 0, fromOffset: -1 } 
     ], 
     { 
      autoCommit: false 
     } 
); 

乾杯!

-1

如果你想只接收新郵件,您可以創建消費者實例之前設置以下屬性: auto.offset.reset =最新

+0

我應該怎麼做? – ItayB

+0

/*在創建KafkaConsumer實例之前,您必須設置屬性。 */ props.setProperty(「auto.offset.reset」,「latest」);/*最早,最新*/ KafkaConsumer consumer = new KafkaConsumer <>(props); – Hussain

+0

你確定你在談論JavaScript(node js)API嗎?看起來像C++ – ItayB