2017-02-23 141 views
1

我想知道如何在AckMode設置爲MANUAL在spring kafka中時提交如何工作。我如何知道我的記錄是否使用Spring Kafka手動提交

以下是我在KafkaConfig中設置的屬性 containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

listener代碼

@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup") 
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) { 
     countDownLatch.countDown();  
     acknowledgment.acknowledge(); 
} 

我做acknowledgement按春卡夫卡的文件,但這只是意味着我的郵件標記爲發送但不消耗(這是我的理解) 。

  1. 在這種情況下,我應該叫commitSync()方法。 如果,我從哪裏調用它,因爲我需要獲得對KafkaConsumer的引用。 如果否,它如何在內部工作,我可以跟蹤它嗎?

  2. 是否有commitId或返回的某個值? 我的想法是知道某個特定的消費者記錄是否被消費。 我想存儲用於內部跟蹤目的的值。

  3. 是否卡夫卡內部維護像消費者記錄(已確認致力於沒有犯),它可以幫助進行分類的任何狀態。

這將幫助我區分有多少記錄被消耗以及有多少記錄正在進行以及它們的狀態。

回答

2

我可以回答第一個問題。一切休息直接就像是一個關於Apache Kafka的故事。

既然我們無法從那裏我們想執行commit,但僅從其執行consumer.poll()同一個線程,我們存儲在內部KafkaMessageListenerContainer隊列中所有提交的請求,並看看到,在主要消費的循環之前執行this.consumer.poll()

即使您使用MANUAL_IMMEDIATE,實際consumer.commitSync()也會在與acknowledgment.acknowledge()不同的線程上執行。

OTOH手裏有在Consumer的API的樣子:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets); 

所以,沒有任何commitId鉤來解決。

我認爲在Apache Kafka中沒有這樣的概念,如Not Committed或其他任何東西。數據總是存在於主題日誌中,並且直到特定的管理操作或壓縮配置纔會從中刪除。

我覺得commit offset功能與我們有consumer group目的,並根據JavaDoc中完全捆綁:

* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every 
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API 
* should not be used. The committed offset should be the next message your application will consume, 
* i.e. lastProcessedMessageOffset + 1. 

所以,當你的消費死了,它會從最後重新啓動其集團承諾所抵消。不同的組可能會讀取相同的數據,但會從其他偏移量中讀取。我認爲這就是爲什麼他們的API沒有提供任何實際狀態的鉤子。沒有這樣的一個!

+0

感謝Artem的詳細回覆。 所以,我從上面的答案中瞭解到,當我們調用'acknowledgement.acknowledge()'時,'consumer.commitSync()'發生在另一個線程上。 因此,確保提交正在發生。 'commitAsync()'也是同樣的行爲嗎? 我希望它'AckMode.MANUAL'相同,因爲我沒有使用'MANUAL_IMMEDIATE' – user1564626

+0

是的,的確如此。任何「消費者」操作都發生在同一個線程上。 'MANUAL_IMMEDIATE'不同於'MANUAL',只能通過直接調用'consumer.wakeup()'來打破當前的民意調查。 –

相關問題