我已經瀏覽了spring-cloud-stream 1.0.0.RELEASE的文檔,似乎找不到有關錯誤處理的任何文檔。spring-cloud-stream kafka錯誤處理
基於kafka 0.9的觀察,如果我的消費者拋出一個RuntimeException,我會看到3次重試。三個試之後,我看到這個在日誌中:
2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]
org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message
在這一點上,消費者偏移滯後1,如果我重新開始消費,消息被再次重試3次。但是,如果我然後將另一條消息發送到同一分區,以便消費者不會拋出異常,則會更新消費者偏移量,並且我們拋出異常的原始消息不會在重新啓動後重試。
這是記錄在哪裏,我沒有找到?錯誤處理綁定器是特定的,還是s-c-s抽象出在綁定器中保持一致?我懷疑這是消費者補償如何使用kafka活頁夾進行更新的意外後果。我看到一個enableDlq kafka消費者屬性被添加了,我即將對此進行測試,但我不確定如何處理kafka中的死信。我熟悉rabbitmq中的死信隊列,但通過rabbitmq,我們可以使用rabbitmq鏟子插件重新發布並重試dlq消息,以涵蓋發生故障的原因是臨時服務中斷。我不知道有任何類似的功能可用於kafka,但我們自己並沒有編寫類似的實用程序。
更新:啓用enableDlq kafka使用者屬性的測試顯示與錯誤處理相同的使用者偏移問題。當消費者拋出一個RuntimeException時,我看到3次重試,之後沒有記錄錯誤消息,並且我看到一條消息發佈到error.<destination>.<group>
,但消費者偏移量沒有更新並滯後1.如果我重新啓動消費者,它會嘗試再次從原始主題分區處理相同的失敗消息,重試3次,並將相同的消息再次放入error.<destination>.<group>
主題(重複的dlq消息)。如果我向另一個消息發佈消息不會拋出RuntimeException的同一主題分區,則偏移量會更新,並且在重新啓動時不再重試原始失敗消息。
我認爲當消費者拋出一個錯誤時,消費者應該更新kafka中的消費者偏移量,而不管enableDlq是否爲真。這至少可以使得所有重試嘗試失敗的消息都被丟棄(當enableDlq爲false時)或發佈到dlq並從不重試(當enableDlq爲true時)。
感謝您的確認。 https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00