2013-06-18 117 views
0

我目前正在使用NMS來開發基於應用程序的ActiveMQ(5.6)。ActiveMQ:多用戶連接到一個隊列,但只有一個用戶接收所有消息

我們有幾個消費者(exe)試圖從相同的隊列(而不是主題)接收大量數據。雖然所有的信息都傳給了一位消費者,儘管我已經讓消費者在收到消息之後睡了幾秒鐘。順便說一下,我們不希望消費者收到其他消費者收到的相同消息。

在官方網站中提到,我們應該設置預取限制,以決定在任何時間點可以向消費者流式傳輸多少條消息。它可以被配置和編碼。

我嘗試過的一種方法是使用PrefetchPolicy類來綁定ConnectionFactory類,如下圖所示。

PrefetchPolicy poli = new PrefetchPolicy(); 
poli.QueuePrefetch = 0; 
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1"); 
fac.PrefetchPolicy = poli; 
using (IConnection con = fac.CreateConnection()) 
{ 
    using (ISession se = con.CreateSession()) 
    { 
     IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue); 
     using (IMessageConsumer consumer = se.CreateConsumer(queue1)) 
     { 
      con.Start(); 
      while (true) 
      { 
       ITextMessage message = consumer.Receive() as ITextMessage; 
       Thread.Sleep(2000); 
       if (message != null) 
       { 
       Task.Factory.StartNew(() => extractAndSend(message.Text)); //do something 
       } 
       else 
       { 
       Console.WriteLine("No message received~"); 
       } 
     } 
     } 
    } 
} 

但是不管設置什麼預取值,消費者的行爲都和以前一樣。

我嘗試了第二種方式來獲得結果,即配置服務器conf文件。我更改了服務器的activemq.xml,如下圖所示。 「producerFlowControl =」 真 「的memoryLimit = 」5MB「/> 」producerFlowControl =「 真」 的memoryLimit = 「5MB」> 不過,雖然我已經設置了dispatchpolicy的消息仍然發給一位消費者。

我想知道: 只需配置服務器xml文件以使所有消費者都能從一個隊列中接收消息,是否可以實現此行爲?如果是這樣,如何配置這個以及我的配置有什麼問題?如果不是,我如何使用代碼來實現目標? 謝謝。

回答

2

看看「消息組」功能。

我有同樣的問題。只有一個消費者處理所有消息。我在我的代碼中發現我送過程中使用的組頭:

request.Properties["NMSXGroupID"] = "cheese"; 

根據官方文檔:

標準JMS的JMSXGroupID頭用來定義該消息屬於該消息組 。然後,消息組功能會確保同一消息組的所有消息都會發送到同一個JMS 消費者 - 而該消費者仍然活着。只要消費者 死亡另一個將被選中。

查看完整的細節在http://activemq.apache.org/message-groups.html

相關問題