2014-10-11 63 views
1

我想創建一個服務器來處理來自用戶的套接字連接,並且在我的服務器中,我希望每個連接都有一個到rabbitmq的連接,但是在他們的網頁提供的示例中我只看到「while」循環等待消息,在這種情況下,我將需要爲每個連接創建一個線程,以便處理來自rabbitmq的消息。java如何處理rabbitmq消息的回調

有沒有辦法在java中使用spring或任何框架來做到這一點,我只是爲rabbitmq而不是使用while循環創建回調?我正在使用node.js,這是非常簡單的, ,我想知道一些關於java的建議。

回答

1

您應該在Channel.basicConsume和DefaultConsumer抽象類看一看:https://www.rabbitmq.com/api-guide.html#consuming

Java併發需要回調來處理每個消息線程,但你可以使用一個線程池來重用線程。

static final ExecutorService threadPool; 

static { 
    threadPool = Executors.newCachedThreadPool(); 
} 

現在,你需要創建一個消費者,將通過創建將被傳遞到線程池來執行一個Runnable實例句柄每次交貨。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) { 
    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable 
     final long msgTag = envelope.getDeliveryTag(); 
     Runnable runnable = new Runnable() { 
      @Override 
      public void run() { 
       // handle the message here 
       doStuff(msgBody); 
       channel.basicAck(msgTag, false); 
      } 
     }; 
     threadPool.submit(runnable); 
    } 
}); 

這說明了如何在單個線程的單一連接和頻道不while循環將被阻止在每個交貨處理併發交付。爲了您的理智,您可能希望將您的Runnable實現分解到它自己的類中,該類可以接受channel,msgBody,msgTag和任何其他數據作爲調用run()方法時可訪問的參數。

+0

非常感謝您的幫助。 – crawlero 2014-10-12 22:24:32