2016-05-17 145 views
1

我已經瀏覽了spring-cloud-stream 1.0.0.RELEASE的文檔,似乎找不到有關錯誤處理的任何文檔。spring-cloud-stream kafka錯誤處理

基於kafka 0.9的觀察,如果我的消費者拋出一個RuntimeException,我會看到3次重試。三個試之後,我看到這個在日誌中:

2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler  : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]] 

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message 

在這一點上,消費者偏移滯後1,如果我重新開始消費,消息被再次重試3次。但是,如果我然後將另一條消息發送到同一分區,以便消費者不會拋出異常,則會更新消費者偏移量,並且我們拋出異常的原始消息不會在重新啓動後重試。

這是記錄在哪裏,我沒有找到?錯誤處理綁定器是特定的,還是s-c-s抽象出在綁定器中保持一致?我懷疑這是消費者補償如何使用kafka活頁夾進行更新的意外後果。我看到一個enableDlq kafka消費者屬性被添加了,我即將對此進行測試,但我不確定如何處理kafka中的死信。我熟悉rabbitmq中的死信隊列,但通過rabbitmq,我們可以使用rabbitmq鏟子插件重新發布並重試dlq消息,以涵蓋發生故障的原因是臨時服務中斷。我不知道有任何類似的功能可用於kafka,但我們自己並沒有編寫類似的實用程序。

更新:啓用enableDlq kafka使用者屬性的測試顯示與錯誤處理相同的使用者偏移問題。當消費者拋出一個RuntimeException時,我看到3次重試,之後沒有記錄錯誤消息,並且我看到一條消息發佈到error.<destination>.<group>,但消費者偏移量沒有更新並滯後1.如果我重新啓動消費者,它會嘗試再次從原始主題分區處理相同的失敗消息,重試3次,並將相同的消息再次放入error.<destination>.<group>主題(重複的dlq消息)。如果我向另一個消息發佈消息不會拋出RuntimeException的同一主題分區,則偏移量會更新,並且在重新啓動時不再重試原始失敗消息。

我認爲當消費者拋出一個錯誤時,消費者應該更新kafka中的消費者偏移量,而不管enableDlq是否爲真。這至少可以使得所有重試嘗試失敗的消息都被丟棄(當enableDlq爲false時)或發佈到dlq並從不重試(當enableDlq爲true時)。

回答

1

看起來像一個錯誤,我 - 監聽器容器具有不被暴露粘結劑(或設置)屬性autoCommitOnErrorfalse默認情況下)。調用錯誤處理程序(發佈錯誤)後,如果布爾值爲true,則提交偏移量。

請在github作爲問題報告。

+0

感謝您的確認。 https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00