2017-04-07 122 views
0

我很難找到春季AMQP /兔子MQ中的排定/延遲郵件的方式,並在這裏找到解決方案。但我仍然與一個關於Spring AMQP/Rabbit MQ的prolem ,它可以沒有收到任何消息。春季amqp延遲郵件與rabbitMQ

@Configuration 公共類AmqpConfig {

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
    connectionFactory.setAddresses("172.16.101.14:5672"); 
    connectionFactory.setUsername("admin"); 
    connectionFactory.setPassword("admin"); 
    connectionFactory.setPublisherConfirms(true); 
    return connectionFactory; 
} 


@Bean 
@Scope("prototype") 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    return template; 
} 


@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

@Bean 
public Queue queue() { 
    return new Queue("spring-boot-queue", true); 

} 

@Bean 
Binding binding(Queue queue, Exchange delayExchange) { 
    return BindingBuilder.bind(queue).to(delayExchange).with("spring-boot-queue").noargs(); 
} 

@Bean 
public SimpleMessageListenerContainer messageContainer() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); 
    container.setQueues(queue()); 
    container.setExposeListenerChannel(true); 
    container.setMaxConcurrentConsumers(1); 
    container.setConcurrentConsumers(1); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 

    container.setMessageListener(new ChannelAwareMessageListener() { 

     public void onMessage(Message message, Channel channel) throws Exception { 
      byte[] body = message.getBody(); 
      System.err.println("receive msg : " + new String(body)); 
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 

     } 
    }); 

    return container; 
} 

}

@Component 公共類發送實現RabbitTemplate.ConfirmCallback {

private RabbitTemplate rabbitTemplate; 

@Autowired 
public Send(RabbitTemplate rabbitTemplate) { 
    this.rabbitTemplate = rabbitTemplate; 
    this.rabbitTemplate.setConfirmCallback(this); 
    rabbitTemplate.setMandatory(true); 
} 

public void sendMsg(String content) { 

    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); 

    rabbitTemplate.convertAndSend("my-exchange", "", content, new MessagePostProcessor() { 
     @Override 
     public Message postProcessMessage(Message message) throws AmqpException { 
      message.getMessageProperties().setHeader("x-delay", 6000); 
      return message; 
     } 
    },correlationId); 

    System.err.println("delay message send ................"); 

} 

/** 
* 回調 
*/ 
@Override 
public void confirm(CorrelationData correlationData, boolean ack, String cause) { 

    System.err.println(" callback id :" + correlationData); 

    if (ack) { 
     System.err.println("ok"); 
    } else { 
     System.err.println("fail:" + cause); 
    } 
} 

我作爲下面的源

}

有沒有人可以提供幫助。

謝謝大家。

回答

1

延遲消息傳遞與Spring amqp無關,它是一個駐留在代​​碼中的庫,因此庫不能保存任何消息。有兩種方法可以嘗試:

老方法: 設置TTL(生存時間)在每個消息/隊列頭(政策),然後引入DLQ處理它。一旦ttl過期,您的消息將從DLQ移動到主隊列,以便您的聽衆可以處理它。

最新方法: 最近的RabbitMQ想出了RabbitMQ的延遲郵件插件,利用它可以實現的,因爲RabbitMQ的-3.5.8相同的,這個插件支持可用。

您可以使用類型x-delayed-message聲明交換,然後使用自定義標題x-delay以毫秒爲單位發佈消息,以表示消息的延遲時間。該消息將在X-delay毫秒

這裏更多地傳遞到相應的隊列:git

+0

這就是我需要延遲郵件插件,你提到的RabbitMQ,感謝和最好的方面。 – squall

+0

從發送者和接收者的角度看看你的交換減速 – lambodar