2016-05-13 401 views
1

我正在Java中使用RabbitMQ。
我有兩個RabbitMQ服務器,具有相同的配置,一個是開發環境,另一個是生產環境。
這是對消費者的聲明:java - RabbitMQ消費者沒有收到消息

/* 
    * Connection and channel declaration 
    */ 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setUri(prop.getProperty("ConnectionURI")); 
    connection = factory.newConnection(); 
    channel = connection.createChannel(); 

    /* 
    * Queue declaration and exchange binding 
    */ 
    channel.exchangeDeclare(prop.getProperty("printExchange"), "topic", false, false, false, new HashMap<>()); 
    queueName = prop.getProperty("printQueue"); 
    routing_key = "print." + codCliente + "." + idCassa; 
    channel.queueDeclare(queueName, false, false, false, null); 
    channel.queueBind(queueName, prop.getProperty("printExchange"), routing_key); 

這裏,它開始偵聽的隊列:

JAyronPOS.LOGGER.info("Waiting for a message on the queue -> " + queueName + " with routingkey -> " + routing_key); 
Consumer consumer = new DefaultConsumer(channel) { 
    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     JAyronPOS.LOGGER.info("This is the received message -> " + queueName + ": " + new String(body, "UTF-8")); 
     Map<String, Object> headers = properties.getHeaders(); 
     if (envelope.getRoutingKey().equals(routing_key)) { 
      JAyronPOS.LOGGER.info("Message is for me, because it has my routing key"); 
      channel.basicAck(envelope.getDeliveryTag(), false); 
      if (headers != null) { 
       if (headers.containsKey("command")) { 
        JAyronPOS.LOGGER.info("It's a command!"); 
        JAyronPOS.LOGGER.info(headers.get("command").toString()); 
        if ("requestClose".equals(headers.get("command").toString())) { 
         ChiusuraFiscaleConfirm confirm = gson.fromJson(new String(body, "UTF-8"), ChiusuraFiscaleConfirm.class); 
         if (confirm.isCanClose()) { 
          eseguiChiusuraFiscale(); 
         } else { 
          JOptionPane.showMessageDialog(null, "Can't close", "Error", JOptionPane.ERROR_MESSAGE); 
         } 
        } else { 
         JAyronPOS.LOGGER.info("Can't handle the message"); 
        } 
       } 
      } else { 
       System.out.println("It's a ticket"); 
       TicketWrapper ticket = gson.fromJson(new String(body, "UTF-8"), TicketWrapper.class); 
       printTicket(ticket); 
      } 
     }else{ 
      JAyronPOS.LOGGER.info("The message isn't for me, because it has the routingkey: "+envelope.getRoutingKey()); 
     } 
    } 
}; 
channel.basicConsume(queueName, false, consumer); 

在開發環境中,我有最多5個隊列,而在生產環境中,我有在150到200個隊列之間。
消息由交換機發送,並帶有個人路由密鑰。發送的消息數量不高(強調時不超過10 msg/s)。
當我在開發環境中測試消費者時,一切正常:
- 我發送一個RPC調用,服務器對它進行處理並回復。消費者閱讀回覆並調用正確的方法。全部在大約1-2秒。
當我在生產環境中使用軟件時(我只是通過在config.properties文件中註釋/分解一行來更改環境),它不起作用:
- 我發送RPC調用,服務器對它進行處理,在隊列上發送回覆。消費者永遠不會收到消息(但我可以看到Web管理面板在隊列中開發的消息)。

這可能是問題嗎?

編輯:我注意到,如果我發送RPC調用,在RabbitMQ web面板中的回覆隊列中,在「Deliver」(淺藍色)下有一條消息,而如果我發送3-4個RPC調用(與前一個相同),在某個呼叫之後,在回覆隊列中,在發佈(黃色)下有一條消息,並且消費者接收到回覆。

回答

0

您尚未提供發佈代碼或拓撲結構,因此很難猜測拓撲如何工作。

但是在消費者上匹配路由密鑰是個壞主意,交換應該爲你做。消費者可以爲所需的路由密鑰創建和綁定隊列。 在您的代碼else分支在消費者不承認或拒絕的消息,這可能會導致它掛起在delivered狀態,永遠不會被其他消費者接收。

Rabbitmq教程有用的RPC部分https://www.rabbitmq.com/tutorials/tutorial-six-java.html

相關問題