2016-10-03 115 views
0

我消耗與彈簧集成-卡夫卡消息,使用message-driven-channel-adapter如何正確處理消息時的錯誤?

<int-kafka:message-driven-channel-adapter 
    id="kafkaListener" 
    listener-container="container1" 
    channel="outputFromKafka" 
    error-channel="errorChannel"/> 

容器使用一個JsonDeserializer到傳入的JSON反序列化到對象:

<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
    <beans:constructor-arg> 
     <beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
      <beans:constructor-arg> 
       <beans:map> 
        <beans:entry key="bootstrap.servers" value="localhost:9092" /> 
        <beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> 
        <beans:entry key="group.id" value="mygroup" /> 
       </beans:map> 
      </beans:constructor-arg> 
      <beans:property name="valueDeserializer"> 
       <beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer"> 
        <beans:constructor-arg value="com.foo.MyType"/> 
       </beans:bean> 
      </beans:property> 
     </beans:bean> 
    </beans:constructor-arg> 
    <beans:constructor-arg> 
     <beans:bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
      <beans:constructor-arg name="topics" value="foo" /> 
     </beans:bean> 
    </beans:constructor-arg> 
</beans:bean> 

如果該消息可以(例如因爲消費者不小心使用了錯誤的類型),拋出異常:

ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ... 

之後,適配器再次收到相同的消息(可能是因爲最後一個沒有提交?),並且以完全相同的方式失敗,導致層出不窮的異常。

看起來好像沒有使用已配置的error-channel

有什麼辦法來處理這樣的錯誤,以及它是如何在XML中配置的?

回答

0

看起來好像沒有使用配置的錯誤通道。

是什麼讓你相信?

我只是跑設置爲errorChannel(默認,與一家伐木適配器)錯誤通道測試...

12:06:08.366 [container-kafka-consumer-1] ERROR o.s.i.handler.LoggingHandler - ... 

您可以爲org.springframework.integration調試日誌片斷呈現出故障?

編輯

哦,對不起......

ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ... 

這是在棧中低得多下來Spring集成得到消息之前。

+0

請參閱我的編輯 - 您需要自定義反序列化器以捕獲異常並返回指示解序器失敗的內容。即一些「特殊的」MyType實例。 –

+0

你是對的;由'org.apache.kafka.common.serialization.Deserializer'拋出的異常僅僅被Kafka客戶端代碼捕獲並記錄下來,而沒有任何可能進行干預。 使用反序列化類型的魔術實例在技術上是可行的,但是違反了我的一些原則;) – Tom

+0

也許使用Kafka值解串器對於JSON並不是一個好的選擇。相反,我認爲,一個StringDeserializer和一個單獨的json到對象轉換器可以提供更多的靈活性和健壯性。 – Tom