2015-07-11 245 views
2

我在我的日食中運行優先隊列java program,我遇到了問題,第一次得到正確的答案。還有一次,我在隊列中增加了一條消息,但這次我得到了不同的結果。RabbitMQ消息優先級隊列不工作

public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
     Connection conn = factory.newConnection(); 
     Channel ch = conn.createChannel(); 
     Map<String, Object> args = new HashMap<String, Object>(); 
     args.put("x-max-priority", 10); 
     ch.queueDeclare(QUEUE_UPDATE, true, false, false, args); 

     publish(ch, 141); 
     publish(ch, 250); 

     final CountDownLatch latch = new CountDownLatch(2); 
     ch.basicConsume(QUEUE_UPDATE, true, new DefaultConsumer(ch) { 
      public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { 
       System.out.println("Received " + new String(body)); 
       latch.countDown(); 
      } 
     }); 

     latch.await(); 
     conn.close();   
     System.out.println("Finished"); 
} 

private static void publish(Channel ch, int priority) throws Exception { 
    BasicProperties props = MessageProperties.PERSISTENT_BASIC.builder().priority(priority).build(); 
    String body = QUEUE_UPDATE + " message with priority " + priority ; 
    ch.basicPublish("", QUEUE_UPDATE, props, body.getBytes());   
} 

正確的輸出:

Received update-queue message with priority 250 
Received update-queue message with priority 141 
Finished 

增加了一個隊列消息

  publish(ch, 141);  
     publish(ch, 250); 
     publish(ch, 110); // newly added 

預期輸出

Received update-queue message with priority 250 
Received update-queue message with priority 141 
Received update-queue message with priority 110 
Finished 

實際輸出

Received update-queue message with priority 141 
Received update-queue message with priority 250 
Received update-queue message with priority 110 
Finished 

這是怎麼回事?我做錯了什麼?

+0

刪除隊列並嘗試使用:'args.put(「x-max-priority」,250);' – Gabriele

+0

對於遇到此問題的C#開發人員,請參閱http://stackoverflow.com/questions/29221020/rabbitmq-3-5-and-message-priority/38999171#38999171。簡而言之,「push api」不尊重.Priority。 – granadaCoder

回答

2

我遇到了同樣的問題。對我而言有效的是定義一個由consumer prefetch定義的限制,例如channel.basicQos(1);。 如果您沒有設置此限制,那麼消息會在消費者到達代理時傳遞給消費者,因此它們不會使用優先級進行排序。

當您設置一個下限時,代理一次不會發送比此限制更多的消息,從而在交付之前對消息進行排序。

+0

非常感謝Toresan!如果我可以將它標記爲正確答案,它就可以工作100%! –