2016-12-05 61 views
2

我以前用JMS非常成功地使用過Spring集成,但現在我們正在使用RabbitMQ/AMQP並且在處理錯誤時遇到了一些問題。春季整合AMQP - 儘管重試建議不斷重試

我有一個int-amqp:入站通道適配器與errorChannel設置爲接收任何異常,在這裏ErrorTransformer類檢查失敗的消息的原因異常。然後,根據例外任一類型: -

  1. 抑制異常,並轉換成一個JSON對象,可以去AMQP出站通道適配器作爲企業回覆解釋的失敗。在這裏我想要消耗/確認原始消息。

  2. 或者重新引發異常讓RabbitMQ重新傳遞消息一定次數。

我發現,重新投擲造成不斷重新提供的消息,我再瞭解StatefulRetryOperationsInterceptorFactoryBean,所以增加了一個建議鏈重試3次,然後我得到任何消息-ID異常,所以還在建議鏈的開頭添加了一個'MissingMessageIdAdvice'。

儘管有建議,我仍然得到連續重試爲從errorChannel的ErrorTransformer重新拋出的RuntimeException。我通過RabbitMQ管理員僅使用默認值發佈消息。不知道是否缺少一個消息ID使這不起作用,如果是這樣,我怎麼得到一個消息有一個ID? 我對以下兩者之間的差異感到困惑:A)ConditionalRejectingErrorHandler(我設置爲入站適配器的錯誤處理程序),它允許我提供一個customFatalExceptionStrategy來實現isFatal()。我認爲fatal = true(意思是DISCARD)並且消息被消耗並丟棄的地方,但是我怎樣才能發送出站故障消息?

B)和我用於檢查異常並轉換爲出站失敗響應消息的入站適配器上的errorChannel。在這裏我想我可以拋出AmqpRejectAndDontRequeueException,但是爲什麼還要有ConditionalRejectingErrorHandler呢?並將thorwing AmqpRejectAndDontRequeueException工作

 <int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation" channel="requestAmqpIn" channel-transacted="true" transaction-manager="transactionManager" 
     queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory" 
     receive-timeout="59000" concurrent-consumers="1" 
     advice-chain="retryChain" error-handler="customErrorHandler" /> 

<bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler"> 
     <constructor-arg ref="customFatalExceptionStrategy"/> 
    </bean>        


<bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/> 



    <!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever --> 

    <util:list id="retryChain"> 
     <bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice"> 
      <constructor-arg> 
       <bean class="org.springframework.retry.policy.MapRetryContextCache" /> 
      </constructor-arg> 
     </bean> 
     <ref bean="retryInterceptor" /> 
    </util:list> 

<bean id="retryInterceptor" 
    class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean"> 
    <property name="retryOperations" ref="retryTemplate" /> 
    <property name="messageRecoverer" ref="messageRecoverer"/> 
</bean> 

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
    <property name="retryPolicy" ref="simpleRetryPolicy" /> 
    <property name="backOffPolicy"> 
      <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 
       <property name="initialInterval" value="10000" /> 
      </bean> 
    </property> 
</bean> 

    <bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy"> 
    <property name="maxAttempts" value="3" /> 
</bean> 
+0

是的,那很難處理你的話題。也許把它切成幾個SO問題會更好? –

+0

但是每個問題都是緊密相關的,所以我會很感激任何建議或解釋。 – Pete

+0

我已將我的帖子簡化爲一個問題,並將發佈另一個關於單獨的點 – Pete

回答

0

我從來沒有想過發佈解決方案,所以在這裏。

一旦我將重試通知鏈配置爲必須包含RejectAndDontRequeueRecoverer(我相信也是默認值)的messageRecoverer的AMQP入站通道適配器。我缺少的重要一點是確保發件人發送郵件時包含message_id。因此,如果通過RabbitMQ管理控制檯發佈,我需要包含預定義的message_id屬性並提供一個值。

使用'MissingMessageIdAdvice'並沒有幫助(所以我刪除了),因爲它會產生一個不同的message_id每個重新交付的消息導致重試計數不增加,因爲每個交付被認爲是不同的最後

0

你必須使用RejectAndDontRequeueRecoverer停在重試結束再發貨:

* MessageRecover that causes the listener container to reject 
* the message without requeuing. This enables failed messages 
* to be sent to a Dead Letter Exchange/Queue, if the broker is 
* so configured. 

是的,messageId是對於重試用例很重要。

如果在發送期間無法手動提供,則可以注入自定義MessageKeyGenerator策略以確定來自消息的唯一密鑰。