我很難找到春季AMQP /兔子MQ中的排定/延遲郵件的方式,並在這裏找到解決方案。但我仍然與一個關於Spring AMQP/Rabbit MQ的prolem ,它可以沒有收到任何消息。春季amqp延遲郵件與rabbitMQ
@Configuration 公共類AmqpConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("172.16.101.14:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue queue() {
return new Queue("spring-boot-queue", true);
}
@Bean
Binding binding(Queue queue, Exchange delayExchange) {
return BindingBuilder.bind(queue).to(delayExchange).with("spring-boot-queue").noargs();
}
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.err.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
}
});
return container;
}
}
@Component 公共類發送實現RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
@Autowired
public Send(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
}
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("my-exchange", "", content, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 6000);
return message;
}
},correlationId);
System.err.println("delay message send ................");
}
/**
* 回調
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println(" callback id :" + correlationData);
if (ack) {
System.err.println("ok");
} else {
System.err.println("fail:" + cause);
}
}
:
我作爲下面的源
}
有沒有人可以提供幫助。
謝謝大家。
這就是我需要延遲郵件插件,你提到的RabbitMQ,感謝和最好的方面。 – squall
從發送者和接收者的角度看看你的交換減速 – lambodar