0
我消耗與彈簧集成-卡夫卡消息,使用message-driven-channel-adapter
:如何正確處理消息時的錯誤?
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
channel="outputFromKafka"
error-channel="errorChannel"/>
容器使用一個JsonDeserializer
到傳入的JSON反序列化到對象:
<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<beans:constructor-arg>
<beans:map>
<beans:entry key="bootstrap.servers" value="localhost:9092" />
<beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
<beans:entry key="group.id" value="mygroup" />
</beans:map>
</beans:constructor-arg>
<beans:property name="valueDeserializer">
<beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer">
<beans:constructor-arg value="com.foo.MyType"/>
</beans:bean>
</beans:property>
</beans:bean>
</beans:constructor-arg>
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.listener.config.ContainerProperties">
<beans:constructor-arg name="topics" value="foo" />
</beans:bean>
</beans:constructor-arg>
</beans:bean>
如果該消息可以(例如因爲消費者不小心使用了錯誤的類型),拋出異常:
ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ...
之後,適配器再次收到相同的消息(可能是因爲最後一個沒有提交?),並且以完全相同的方式失敗,導致層出不窮的異常。
看起來好像沒有使用已配置的error-channel
。
有什麼辦法來處理這樣的錯誤,以及它是如何在XML中配置的?
請參閱我的編輯 - 您需要自定義反序列化器以捕獲異常並返回指示解序器失敗的內容。即一些「特殊的」MyType實例。 –
你是對的;由'org.apache.kafka.common.serialization.Deserializer'拋出的異常僅僅被Kafka客戶端代碼捕獲並記錄下來,而沒有任何可能進行干預。 使用反序列化類型的魔術實例在技術上是可行的,但是違反了我的一些原則;) – Tom
也許使用Kafka值解串器對於JSON並不是一個好的選擇。相反,我認爲,一個StringDeserializer和一個單獨的json到對象轉換器可以提供更多的靈活性和健壯性。 – Tom