2017-03-13 64 views
0

爲了使用延遲交換,我通過發送「INT:網關」消息的RabbitMQ用這種方法:如何從RabbitMQ的獲得頭與Spring集成

void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey); 

我可以在RabbitMQ的看到,標題正確顯示: x-delay:-60000

但是,當我從RabbitMQ收到此消息時,如何獲取此標頭?

到目前爲止,我正在接收我以前作爲Json發送的對象,但是如果我嘗試獲取標題,我將得到一個異常。

發送:

integration.xml文件:

<!-- Producing service --> 
    <int:gateway id="gateway" default-request-channel="producingChannel" service-interface="Gateway"/> 
    <!-- Producing service --> 


<!-- Service => RabbitMQ (Producing) --> 
    <int:chain input-channel="producingChannel"> 
     <int:object-to-json-transformer/> 
     <int-amqp:outbound-channel-adapter exchange-name="${queuing.notifications-exchange}" routing-key-expression="headers.routingKey" mapped-request-headers="*"/> 
    </int:chain> 
    <!-- Service => RabbitMQ (Producing) --> 

網關中的Java文件:

void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey); 

接收:

integration.xml文件:

<!-- RabbitMQ => Service (Consuming) --> 
    <int-amqp:inbound-channel-adapter channel="consumingChannel" queue-names="${queuing.operator.queue}" concurrent-consumers="${queuing.concurrent-consumers}" prefetch-count="${queuing.prefetch-count}" mapped-request-headers="*" error-channel="errorChannel" /> 
    <!-- RabbitMQ => Service (Consuming) --> 


<!-- Routing --> 
<int:chain input-channel="consumingChannel"> 
    <int:json-to-object-transformer type="Notification"/> 
    <int:service-activator ref="workingService" method="processNotificationFromQueue"/> 
</int:chain> 
<!-- Routing --> 

在Java文件WorkingService:

public void processNotificationFromQueue(Notification notification, 
      @Header(MessageProperties.X_DELAY) Integer delay) { ... 
} 

異常被拋出在這裏:

Caused by: java.lang.IllegalArgumentException: required header not available: x-delay 

回答

0

你必須使用AmqpHeaders.RECEIVED_DELAY代替。

由於您使用正確mapped-request-headers="*"默認DefaultAmqpHeaderMapper映射正確:

Integer receivedDelay = amqpMessageProperties.getReceivedDelay(); 
if (receivedDelay != null) { 
    headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay); 
} 
+0

謝謝!現在按我的預期工作。 –

+0

這個不同的標頭用於避免從入站rabbitmq消息到出站的意外傳播。 –