0

。我創建一個死信隊列,我可以能夠RabbitMQ的管理爲「d,DLE」,但沒有DLK看可能是我失蹤設置「X-死信路由鍵」,事情是我不希望我的路由消費者key.Few綁定到特定的交流,我創建每個交換DLE如果有那麼DLE連接到接收消息,並做了用戶依賴logic.But是交換爲交換消費者的任何問題不幸的是,這不起作用,DLE沒有收到任何消息。春天啓動的RabbitMQ DLE不接受我的春天啓動的RabbitMQ工作的任何消息

請找到下面的代碼,

package com.sample.rabbit; 

import org.slf4j.Logger; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.rabbit.annotation.Argument; 
import org.springframework.amqp.rabbit.annotation.QueueBinding; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; 
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; 
import org.springframework.amqp.support.converter.DefaultClassMapper; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.util.ErrorHandler; 

@SpringBootApplication 
public class SampleRabbitApplication { 

public static void main(String[] args) throws Exception { 
    ConfigurableApplicationContext context = SpringApplication.run(SampleRabbitApplication.class, args); 
    context.getBean(SampleRabbitApplication.class).runDemo(); 
    context.close(); 
} 

@Autowired 
private RabbitTemplate template; 

private void runDemo() throws Exception { 
    this.template.convertAndSend("sample-queue", new Foo("bar"),m -> { 
     m.getMessageProperties().setHeader("__TypeId__","foo"); 
     return m; 
    }); 

    this.template.convertAndSend("sample-queue", new Foo("throw"),m -> { 
     m.getMessageProperties().setHeader("__TypeId__","foo"); 
     return m; 
    }); 
    this.template.convertAndSend("sample-queue", new Foo("bar"), m -> { 
     return new Message("some bad json".getBytes(), m.getMessageProperties()); 
    }); 
    Thread.sleep(5000); 
} 

@RabbitListener(
     id = "sample-queue", 
     bindings = @QueueBinding(
       value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-queue", durable = "true"), 
       exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "sample.exchange", durable = "true") 
     ) 
) 
public void handle(Foo in) { 
    System.out.println("Received: " + in); 
if("throw".equalsIgnoreCase(in.getFoo())){ 
     throw new BadRequestException("Foo contains throw so it throwed the exception."); 
    } 
} 

@RabbitListener(
     id = "sample-dead-letter-queue", 
     bindings = @QueueBinding(
       value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-dead-letter-queue", durable = "true", arguments = {@Argument(name = "x-dead-letter-exchange",value = "sample.exchange"),@Argument(name = "x-dead-letter-routing-key",value = "#")}), 
       exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "critical.exchange", durable = "true",type = "topic") 
     ) 
) 
public void handleDLE(Message in) { 
    System.out.println("Received in DLE: " + in.getBody()); 
} 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setMessageConverter(jsonConverter()); 
    factory.setErrorHandler(errorHandler()); 
    return factory; 
} 

@Bean 
public ErrorHandler errorHandler() { 
    return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy()); 
} 

@Bean 
public MessageConverter jsonConverter() { 
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    DefaultClassMapper mapper = new DefaultClassMapper(); 
    mapper.setDefaultType(Foo.class); 
    converter.setClassMapper(mapper); 
    return new Jackson2JsonMessageConverter(); 
} 

public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { 

    private final Logger LOG = org.slf4j.LoggerFactory.getLogger(getClass()); 

    public boolean isFatal(Throwable t) { 
     if (t instanceof ListenerExecutionFailedException && isCauseFatal(t.getCause())) { 
      //To do : Here we have to configure DLE(Critical queue) and put all the messages in the critical queue. 
      ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
      if(lefe.getFailedMessage() != null) { 
       LOG.info("Failed to process inbound message from queue " 
         + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
         + "; failed message: " + lefe.getFailedMessage(), t); 
      } else { 
       LOG.info("Failed to process inbound message from queue " 
         + lefe.getMessage(), t); 
      } 
     } 
     return super.isFatal(t); 
    } 

    private boolean isCauseFatal(Throwable cause) { 
     return cause instanceof MessageConversionException 
       || cause instanceof org.springframework.messaging.converter.MessageConversionException 
       || cause instanceof MethodArgumentNotValidException 
       || cause instanceof MethodArgumentTypeMismatchException 
       || cause instanceof NoSuchMethodException 
       || cause instanceof ClassCastException 
       || isUserCauseFatal(cause); 
    } 

    /** 
    * Subclasses can override this to add custom exceptions. 
    * @param cause the cause 
    * @return true if the cause is fatal. 
    */ 
    protected boolean isUserCauseFatal(Throwable cause) { 
     return true; 
    } 


} 

public static class Foo { 

    private String foo; 

    public Foo() { 
     super(); 
    } 

    public Foo(String foo) { 
     this.foo = foo; 
    } 

    public String getFoo() { 
     return this.foo; 
    } 

    public void setFoo(String foo) { 
     this.foo = foo; 
    } 

    @Override 
    public String toString() { 
     return "Foo [foo=" + this.foo + "]"; 
    } 

} 
} 

我的交流和隊列的是直接的,我的每一位消費者將使用不同的路由關鍵,但它屬於同一個交換,所以,我怎麼能寫一個DLE,能有效地消耗所有失敗消息。在上面的代碼示例中,一條消息是成功的,另一條是失敗的,但我在DLE中看不到失敗消息。

任何幫助,將不勝感激。

回答

1

如果使用死信交換(DLX)配置隊列但沒有死信路由密鑰,則會使用原始路由密鑰將消息路由到DLX。

處理您的使用案例的最簡單方法是使DLX成爲話題交換,並使用路由鍵#(所有消息的通配符)將隊列綁定到該話題,並且所有錯誤將轉到該隊列。

如果要將錯誤分隔成單獨的隊列,請將每個DLQ與原始路由鍵綁定在一起。

編輯

這裏是正確的配置:

@RabbitListener(id = "sample-queue", 
     bindings = @QueueBinding(value = @Queue(value = "sample-queue", durable = "true", arguments = 
         @Argument(name = "x-dead-letter-exchange", value = "critical.exchange")), 
        exchange = @Exchange(value = "sample.exchange", durable = "true"))) 
public void handle(Foo in) { 
    System.out.println("Received: " + in); 
} 

@RabbitListener(id = "sample-dead-letter-queue", containerFactory = "noJsonContainerFactory", 
     bindings = @QueueBinding(value = @Queue(value = "sample-dead-letter-queue", durable = "true"), 
      exchange = @Exchange(value = "critical.exchange", durable = "true", type = "topic"), 
      key = "#")) 
public void handleDLE(Message in) { 
    System.out.println("Received in DLE: " + new String(in.getBody())); 
} 

@Bean 
public SimpleRabbitListenerContainerFactory noJsonContainerFactory(ConnectionFactory connectionFactory) { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setErrorHandler(errorHandler()); 
    return factory; 
} 
+0

@ Gary.Thanks很多關於你的reply.I曾試圖按你的答案,但它不能爲我工作,不過我不能得到失敗的消息。請你檢查我的代碼,我的DLE交換是使用一個單獨的交換,這是主題,實際消費者正在使用不同的交換(直接)是好的?我得到「MessageConversionException」,它進入「ConditionalRejectingErrorHandler」中的「isFatal」部分,但我無法在DLE中獲得該消息。請指導我解決這個問題。 – VelNaga

+0

你的死信配置錯誤。它需要繼續原始隊列。此外,您不能使用相同的轉換器從DLQ接收;看我的編輯。 –

+0

是的,現在它進入DLE,但在rabbitMQ管理頁面死信隊列沒有標記DKX,DLK消費者隊列標記爲DLX這不是一個問題? – VelNaga