一個典型的卡夫卡消費者如下所示:卡夫卡高級用戶0.8.4防止郵件丟失
卡夫卡經紀人--->卡夫卡消費者---->下游消費者喜歡彈性搜索
而且根據對Kafka High Level Consumer的文檔:
的「auto.commit.interval.ms」設置更新頻率的 消耗偏移寫入的ZooKeeper
看來,有可能是郵件丟失,如果以下兩件事情:
- 偏移致力於剛過有些消息是從卡夫卡經紀人取回。
- 下游消費者(比如Elastic-Search)未能處理最近一批消息,或者消費者進程本身被終止。
這或許會是最爲理想的,如果偏移量不犯自動根據一定的時間間隔,但他們是通過一個API承諾。這將確保kafka-消費者只有在收到來自下游消費者的確認他們已經成功消費消息之後才能發信號通知補償。可能會有一些消息的重播(如果kafka-消費者在抵消之前死亡),但至少不會有消息丟失。
請讓我知道,如果這樣的API存在於高級消費者。
注:我知道在卡夫卡的0.8.4版本低層次的消費API的,但我不希望管理自己的一切時,我需要的是高層次的消費只是一個簡單的API。
價:
- AutoCommitTask.run(),尋找commitOffsetsAsync
- SubscriptionState.allConsumed()