2014-09-11 66 views
2

我正在使用SpringBoot來啓動連接到RabbitMQ隊列的SpringAMQP應用程序。我希望能夠從生產者發送消息,指定回覆隊列,以便消費者只需要發送而不必調查目的地(因此不必在消息本身中傳遞迴複數據)。回覆在SpringAMQP被預先設置?

這是我

private static final String QUEUE_NAME = "testQueue"; 
private static final String ROUTING_KEY = QUEUE_NAME; 
public static final String REPLY_QUEUE = "replyQueue"; 
private static final String USERNAME = "guest"; 
private static final String PASSWORD = "guest"; 
private static final String IP = "localhost"; 
private static final String VHOST = "/"; 
private static final int PORT = 5672; 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME)); 
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE)); 
    return template; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP); 
    connectionFactory.setUsername(USERNAME); 
    connectionFactory.setPassword(PASSWORD); 
    connectionFactory.setVirtualHost(VHOST); 
    connectionFactory.setPort(PORT); 
    return connectionFactory; 
} 

我發送消息如下(生產者和消費者之間共享的)的配置:

public Object sendAndReply(String queue, String content){ 
     return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() { 

      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE); 
       return message; 
      } 
     }); 
    } 

和等待答覆如下:

public void replyToQueue(String queue){ 
    template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() { 
     @Override 
     public Data handle(Data payload) { 
      System.out.println("Received: "+payload.toString()); 
      return new Data("This is a reply for: "+payload.toString()); 
     } 
    }); 
} 

然而,當發送時,我得到以下例外:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66) 
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663) 
    at prodsend.Prod.sendAndReply(ReplyTester.java:137) 
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49) 
    at prodsend.ReplyTester.main(ReplyTester.java:102) 
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.util.Assert.isNull(Assert.java:89) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) 
    ... 8 more 

該行ReplyTest.137指向上述sendAndReply方法中的return行。


編輯: 這裏是:)

class Data{ 
    public String d; 
    public Data(String s){ d = s; } 
    public String toString() { return d; } 
} 

回答

6

documentation上述數據類:

基本RPC圖案。使用特定的路由鍵將消息發送到默認交換機,並嘗試接收響應。實現通常會將回復頭部設置爲專用隊列,並等待一段時間受限於超時。

所以該方法convertSendAndReceive處理設置replyTo報頭,並返回一個Messaage - 的響應。這是一個同步模式 - RPC。

如果你想這樣做異步 - 你似乎 - 不使用這種方法。使用適當的convertAndSend方法並使用適當的MessagePostProcessor添加您的replyTo標頭。

由於這是異步的,您需要註冊一個單獨的處理程序以接收答覆。這需要在之前發送消息給對方。該處理程序將在發送消息後的某個時刻被調用 - 何時未知。閱讀部分3.5.2異步消費者Spring AQMP Documentation

所以,異步處理流程:

  1. 發件人寄存器上replyTo queueue
  2. 發送的處理程序發送帶有replyTo消息集
  3. 客戶端調用receiveAndReply,處理該消息,並且向replyTo回覆
  4. 發件人回叫方法被觸發

同步處理流程是:

  1. 發件人使用sendAndReceive和塊
  2. 客戶端調用receiveAndReply,處理該消息,並且向replyTo
  3. 發件人發送回覆接收應答發送消息時,喚醒和流程它

所以後者的情況下需要發件人等待。由於您使用receiveXXX而不是註冊異步處理程序,因此如果客戶端需要一段時間才能調用receiveXXX,發件人可能會等待很長時間。順便說一句,如果你想使用同步方法,但使用特定的replyTo,你可以隨時撥打setReplyQueue。對於這種情況,我還提到了一個setReplyTimeout,客戶要麼不費心閱讀消息,要麼忘記回覆。

+0

對於異步方法,會採取什麼措施? – mangusbrother 2014-09-11 08:29:24

+0

@mangusbrother更新了答案 - 比這更復雜一點。是不是總是? – 2014-09-11 08:38:42

相關問題