2017-05-04 1299 views
2

我在我的Spring Boot應用程序中使用了KafkaListener接口,工作得很好。偏移由Kafka本身存儲。當使用@KafkaListener註解時,是否有標準方法來重置偏移量?

現在讓我們說一個話題的消費者部署一個新的版本,並擰緊2個小時的消息。然後,他們修復了該應用程序,並希望以兩小時前的偏移量啓動新版本。

我可以在之前的consumer.offsetsForTimes()調用中使用consumer.seek(),但這僅僅在我使用輪詢機制時很直接,而不是在使用Spring KafkaListener時。

我試圖用一個ConsumerRebalanceListener,但設置我需要的消費是積極的偏移量,其中實例化ConsumerRebalanceListener時,我沒有...

我需要切換到投票? 在我第一次調用KafkaListener之前,我可以在消費者處獲得消費者,但在消費者被分配了分區後?

回答

1

不兼任;即將推出的2.0 release介紹ConsumerAwareMessageListener,它支持將Consumer作爲參數傳遞給@KafkaListener方法。

+0

非常感謝你這麼快的迴應。 但是,目前推薦的處理所描述的場景的方法是什麼?也許重播日誌製作者? (https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-ReplayLogProducer) - 但我無法將偏移量作爲範圍參數或類似參數傳遞,以便我可以讓應用程序暫時處理不同的主題。 – smlgbl

+0

我在想'ConsumerSeekAware'能不能幫助:http://docs.spring.io/spring-kafka/reference/html/_reference.html#seek。您可能需要爲處理後的消息存儲'offset'以用於隨後的'ConsumerSeekAware.onPartitionsAssigned()' –

+0

您可以實現'ConsumerSeekAware',它允許任意查找,但因爲消費者不是線程安全的,所以回調只能在消息傳遞期間或在容器空閒事件發佈時在應用程序偵聽器中調用。但是,沒有辦法訪問'offsetsForTimes'方法,因此您需要其他方法來確定查找點。 –

相關問題