2016-07-07 61 views
1

假設我有一個服務,它通過kafka-rest-proxy使用消息,並始終在同一個使用者組上。我們還要說,它正在消耗一個有一個分區的主題。當服務啓動時,它會在kafka-rest-proxy中創建一個新的使用者,並使用生成的使用者url直到服務關閉。當服務恢復時,它將在kafka-rest-proxy中創建一個新的消費者,並使用新的url(和新消費者)進行消費。卡夫卡休息代理消費者創建

我的問題

  1. 因爲卡夫卡只能有每個分區最多一個消費者。當消費者重新啓動時,在kafka和kafka-rest-proxy中會發生什麼?即新的消費者是在卡夫卡休息代理中創建的,但舊的消費者沒有機會被銷燬。所以現在'n'消費者在'kafka-rest-proxy'中重新啓動我的服務後有'n'個消費者,但其中只有一個正在被消費。我甚至能夠消費我的新消費者的消息,因爲消費者多於分區?

  2. 讓我們更復雜一些,並說我的服務在同一個用戶組和5個分區中有5個實例。 'n'重新啓動服務的所有5個實例之後,我甚至會保證在不確保現有消費者適當銷燬的情況下消費所有消息。即,在消費者創造期間,當消費者出門時,卡夫卡和卡夫卡休息代理人做了什麼?

  3. 什麼被認爲是卡夫卡休息代理的最佳實踐,以確保陳舊的消費者總是清理?你建議堅持消費者的網址?我應該強制重新啓動一個kafka-rest-proxy,以確保在啓動我的服務之前銷燬現有的消費者?

*編輯* 我相信我的問題的一部分,此配置回答,但不是全部。

consumer.instance.timeout.ms - 消費者實例自動銷燬之前的空閒時間量。 類型:int 默認:300000 重要性:低

回答

2
  1. 如果無法正常關閉消費者,將活着的最後一個請求後的一段是向它提出的。該代理將垃圾收集陳舊的消費者正是這種情況 - 如果它不是乾淨關閉,消費者將無限期地堅持一些分區。通過自動垃圾收集消費者,您不需要一些單獨的持久存儲來跟蹤您的消費者實例。正如您發現的,您可以通過config consumer.instance.timeout.ms來控制此超時。

  2. 由於實例將被垃圾收集,因此您將保證最終會使用所有消息。但在超時期間,有些分區仍然可能會分配給舊的使用者組,您不會在這些分區上取得任何進展。

  3. 理想情況下,您的應用程序不清潔關機很罕見,所以最佳做法是在應用程序關閉時清理消費者。即使在特殊情況下,您也可以使用區塊的try/catch/finally銷燬用戶。如果一個人活着,它最終會恢復。除此之外,如果您的應用程序可以容忍這一點,請考慮調整consumer.instance.timeout.ms設置爲較低。它只需要大於使用消費者的呼叫之間的最長時間(並且您應該記住可能的錯誤情況,例如,如果處理消息需要與另一個系統進行交互,並且該系統可能變得很慢/無法訪問,那麼您應該考慮當設置這個配置)。

你可以堅持的URL,但即使是在輸球的消費者,因爲你不能創造原子消費者和它的URL保存到其他一些持久性存儲的軌跡一定的風險。此外,由於完全不受控制的故障,如果您沒有清理機會的情況不應該是常見情況,那麼通常不會讓您受益。如果您需要從該故障中快速恢復,則無論如何,您的應用程序的消費者實例超時可能會顯着減少。

Re:強制重新啓動代理,這將是相當不常見的,因爲REST代理通常是共享服務,並且這樣做會影響所有其他正在使用它的應用程序。