2014-08-29 125 views
1

我是ActiveMQ的新手。我試圖在activemq中實現生產者 - 消費者(發送者 - 接收者)。在我的代碼中,我很容易通過ActiveMQ發送從單個生產者到單個消費者的消息&。但問題是,我無法將信息發送給同一製作人的多個消費者。向ActiveMQ中的多個消費者發送消息

這裏是我的製片 & 消費者類。

MsgProducer.java

package jms_service; 

import javax.jms.JMSException; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class MsgProducer { 

     private static String url = "failover://tcp://localhost:61616"; 
     public static javax.jms.ConnectionFactory connFactory; 
     public static javax.jms.Connection connection; 
     public static javax.jms.Session mqSession; 
     public static javax.jms.Topic topic; 
     public static javax.jms.MessageProducer producer; 

     public static void main(String[] args) throws JMSException { 

      connFactory = new ActiveMQConnectionFactory(url); 
      connection = connFactory.createConnection("system","manager"); 
      connection.start(); 
      mqSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 

      topic = mqSession.createTopic("RealTimeData"); 
      producer = mqSession.createProducer(topic);     
      producer.setTimeToLive(30000); 

      TextMessage message = mqSession.createTextMessage();  

      int seq_id =1; 

      while(true) 
      {    
       message.setText("Hello world | " +"seq_id #"+seq_id);    
       producer.send(message); 
       seq_id++; 

       System.out.println("sent_msg =>> "+ message.getText()); 
       // if(seq_id>100000) break; 

        try { 
         Thread.sleep(1000); 
         } 
        catch (InterruptedException e) { e.printStackTrace();}   
       }  

    } 

} 

MsgConsumer.java

package jms_service; 

import java.text.SimpleDateFormat; 
import java.util.Calendar; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class MsgConsumer { 

      private static String url = "failover://tcp://localhost:61616";  
      public static javax.jms.ConnectionFactory connFactory; 
      public static javax.jms.Connection connection; 
      public static javax.jms.Session mqSession; 
      public static javax.jms.Topic topic; 
      public static javax.jms.MessageConsumer consumer; 

     public static void main(String[] args) throws JMSException, InterruptedException { 

      connFactory = new ActiveMQConnectionFactory(url); 
      connection = connFactory.createConnection("system", "manager"); 
      connection.setClientID("0002"); 
      //connection.start();    
      mqSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
      topic = mqSession.createTopic("RealTimeData"); 
      consumer = mqSession.createDurableSubscriber(topic, "SUBS01"); 
      connection.start(); 

      MessageListener listner = new MessageListener() { 
       public void onMessage(Message message) { 
        try { 
         if (message instanceof TextMessage) { 
          TextMessage txtmsg = (TextMessage) message; 
          Calendar cal = Calendar.getInstance(); 
          //cal.getTime(); 
          SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); 
          String time = sdf.format(cal.getTime()); 

          String msg="received_message =>> "+ txtmsg.getText() + " | received_at :: "+time; 
          System.out.println(msg); 

          //consumer.sendData(msg); 
         } 

         } catch (JMSException e) { 
          System.out.println("Caught:" + e); 
          e.printStackTrace(); 
          } 
        } 
      }; 

      consumer.setMessageListener(listner); 

     } 


} 

誰能幫助找出發送消息給多個消費者的方式。 在此先感謝。

+0

究竟是什麼問題? – 2014-08-29 08:07:49

+0

你有一個硬編碼的客戶端ID ...我承認我不知道ActiveMQ,但我可以想象這是一個原因。 – Fildor 2014-08-29 08:27:37

+0

「如果在調用此方法時具有相同clientID的另一個連接已在運行,則JMS提供程序應檢測到重複ID並引發InvalidClientIDException。」 – Fildor 2014-08-29 08:30:26

回答

1

假設你的問題是

誰能幫助來發送消息給多個消費者

,並找出路,而無需通過您的完整代碼閱讀,這種做法可能是把你的客戶在一個集合

static Vector<consumer> vecConsumer; 

在哪裏你把每一個新的客戶端,並給所有現有的客戶參考。 廣播就像發送至單個客戶端,封裝在,對於一個示例,循環foreach

for(consumer cons : vecConsumer) 
{ 
     //send stuff or put in sending queue 
} 
+0

我承認,我不知道ActiveMQ,但我希望有一個消息隊列來處理這件事。 – Fildor 2014-08-29 08:26:03

+0

@Fildor由於JMS消息隊列對於點對點連接(或多或少)的事實,我想到了一個更通用的方法;) – 2014-08-29 09:47:15

2

隊列語義跨所有消費者遞送消息一次一次和僅-。這是根據JMS規範(瞭解基本知識的一個很好的閱讀)。

主題語義向每個消費者傳遞消息。所以,一個主題可能是您的需求的答案。