2016-02-05 113 views
2

我很難找到Spring AMQP/Rabbit MQ中計劃/延遲消息的方式。
經過多次搜索後,我仍然無法在Spring AMQP中做到這一點。有人可以告訴我如何在Spring AMQP中使用x-delay
我想延遲消息,如果消費者一方發生某種異常。 RabbitMQ的說要添加X-延遲和安裝的我已經有了這樣的插件,但還是消息不延時Spring AMQP中的計劃/延遲消息傳遞RabbitMq



我得到這消息
收到<(身體立刻正在添加:「[B @ 60a4ae5f (字節[26])'MessageProperties [頭= {X-延遲= 15000}

@Bean 
ConnectionFactory connectionFactory(){ 

    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setPort(1500); 
    connectionFactory.setPublisherReturns(true); 
    return connectionFactory; 

} 

@Bean 
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) { 
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null); 
    //return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
DirectExchange exchange() { 
    DirectExchange exchange=new DirectExchange("delay-exchange"); 
    return exchange; 
} 

消費者---
@Override

public void onMessage(Message message, Channel channel) throws Exception { 

    System.out.println("Received <" + message+ ">" +rabbitTemplate); 

    if(i==1){ 
     AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); 
     Map<String,Object> headers = message.getMessageProperties().getHeaders(); 
     headers.put("x-delay", 15000); 
     props.headers(headers); 
     i++; 
     channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), 
       props.build(), message.getBody()); 
    } 
    } 
+1

不確定是誰推薦你用'jms'標記標記這個問題,但這是不正確的。這只是關於RabbitMQ。是的,'spring-amqp'。正在修復... –

回答

4

首先,您好像沒有與Scheduling Messages with RabbitMQ文章如下:

要使用延遲的消息交換,你只需要如下聲明提供「X-延遲的消息」交換式交換:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-delayed-type", "direct"); 
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); 

我會說同樣可以使用Spring AMQP來實現:

@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

另一個值得關注喲你真的應該發佈消息到delay-exchange,而不是任何其他。再說一遍:無論如何,該文檔都提到了這一點。

UPDATE

由於春季AMQP 1.6延遲的消息得到支撐外的現成功能: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available

+0

是的,我沒有在Exchange中獲得這種x-delayed-message類型,因爲它不在Spring AMQP中。我經歷了很多次,但無法弄清楚它是一種Exchange類型。無論如何,我現在能夠做到。 –

+0

感謝張貼在Spring AMQP以及... –

+0

@Artem我應該在哪裏尋找spring-rabbit-1.6.xsd –