2010-12-23 68 views
7

大家好,請給新手一些關於ActiveMQ和JMS的基礎知識。還有配置步驟。ActiveMQ和JMS:新手基本步驟

+3

要降選民:這個問題是許多年前問,我知道這是一個低質量的問題,但不值得讚揚。至少在這裏添加您的評論。不要隱藏你的自我,並從後門downvote .. – 2013-08-05 11:32:27

+0

你有沒有得到任何一步一步的指導activemq安裝示例? – 2013-08-30 12:50:47

回答

16

我們將使用多線程創建基於控制檯的應用程序。因此爲控制檯應用程序創建一個Java項目。

現在,請按照下列步驟..........

  1. javax.jms.jar,ActiveMQ的,全5.3.0.jar,log4j的-1.2.15.jar加入你的項目庫。 (你可以從http://www.jarfinder.com/下載上述所有的jar文件。

  2. 創建一個文件命名jndi.properties和粘貼這些下列文本。(Deatils爲jndi.properties它只是谷歌)


# START SNIPPET: jndi 

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory 

# use the following property to configure the default connector 
java.naming.provider.url = tcp://localhost:61616 

# use the following property to specify the JNDI name the connection factory 
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry 
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry 

# register some queues in JNDI using the form 
# queue.[jndiName] = [physicalName] 
queue.MyQueue = example.MyQueue 


# register some topics in JNDI using the form 
# topic.[jndiName] = [physicalName] 
topic.MyTopic = example.MyTopic 

# END SNIPPET: jndi 

添加JMSConsumer.java


import javax.jms.*; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
import javax.naming.NamingException; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 

public class JMSConsumer implements Runnable{ 
    private static final Log LOG = LogFactory.getLog(JMSConsumer.class); 

    public void run() { 
     Context jndiContext = null; 
     ConnectionFactory connectionFactory = null; 
     Connection connection = null; 
     Session session = null; 
     MessageConsumer consumer = null; 
     Destination destination = null; 
     String sourceName = null; 
     final int numMsgs; 
     sourceName= "MyQueue"; 
     numMsgs = 1; 
     LOG.info("Source name is " + sourceName); 
     /* 
     * Create a JNDI API InitialContext object 
     */ 
     try { 
      jndiContext = new InitialContext(); 
     } catch (NamingException e) { 
      LOG.info("Could not create JNDI API context: " + e.toString()); 
      System.exit(1); 
     } 

     /* 
     * Look up connection factory and destination. 
     */ 
     try { 
      connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory"); 
      destination = (Destination)jndiContext.lookup(sourceName); 
     } catch (NamingException e) { 
      LOG.info("JNDI API lookup failed: " + e); 
      System.exit(1); 
     } 


     try { 
      connection = connectionFactory.createConnection(); 
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      consumer = session.createConsumer(destination); 
      connection.start(); 
      try { 
       Thread.sleep(2000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      MessageListener listener = new MyQueueMessageListener(); 
      consumer.setMessageListener(listener); 
      //Let the thread run for some time so that the Consumer has suffcient time to consume the message 
      try { 
       Thread.sleep(5000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } catch (JMSException e) { 
      LOG.info("Exception occurred: " + e); 
     } finally { 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException e) { 
       } 
      } 
     } 
    } 

    } 

添加JMSProducer.java


import javax.jms.*; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
import javax.naming.NamingException; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 


public class JMSProducer implements Runnable{ 
private static final Log LOG = LogFactory.getLog(JMSProducer.class); 

public JMSProducer() { 
} 

//Run method implemented to run this as a thread. 
public void run(){ 
Context jndiContext = null; 
ConnectionFactory connectionFactory = null; 
Connection connection = null; 
Session session = null; 
Destination destination = null; 
MessageProducer producer = null; 
String destinationName = null; 
final int numMsgs; 
destinationName = "MyQueue"; 
numMsgs = 5; 
LOG.info("Destination name is " + destinationName); 

/* 
* Create a JNDI API InitialContext object 
*/ 
try { 
    jndiContext = new InitialContext(); 
} catch (NamingException e) { 
    LOG.info("Could not create JNDI API context: " + e.toString()); 
    System.exit(1); 
} 

/* 
* Look up connection factory and destination. 
*/ 
try { 
    connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory"); 
    destination = (Destination)jndiContext.lookup(destinationName); 
} catch (NamingException e) { 
    LOG.info("JNDI API lookup failed: " + e); 
    System.exit(1); 
} 

/* 
* Create connection. Create session from connection; false means 
* session is not transacted.create producer, set the text message, set the co-relation id and send the message. 
*/ 
try { 
    connection = connectionFactory.createConnection(); 
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    producer = session.createProducer(destination); 
    TextMessage message = session.createTextMessage(); 
    for (int i = 0; i

添加MyQueueMessageListener.java


import java.io.*; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import javax.jms.*; 


public class MyQueueMessageListener implements MessageListener { 
    private static final Log LOG = LogFactory.getLog(MyQueueMessageListener.class); 
    /** 
    * 
    */ 
    public MyQueueMessageListener() { 
    // TODO Auto-generated constructor stub 
    } 

    /** (non-Javadoc) 
    * @see javax.jms.MessageListener#onMessage(javax.jms.Message) 
    * This is called on receving of a text message. 
    */ 
    public void onMessage(Message arg0) { 
     LOG.info("onMessage() called!"); 
     if(arg0 instanceof TextMessage){ 
      try { 
       //Print it out 
       System.out.println("Recieved message in listener: " + ((TextMessage)arg0).getText()); 

       System.out.println("Co-Rel Id: " + ((TextMessage)arg0).getJMSCorrelationID()); 
       try { 
        //Log it to a file 
        BufferedWriter outFile = new BufferedWriter(new FileWriter("MyQueueConsumer.txt")); 
        outFile.write("Recieved message in listener: " + ((TextMessage)arg0).getText()); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } catch (JMSException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     }else{ 
      System.out.println("~~~~Listener : Error in message format~~~~"); 
     } 

    } 

    } 

添加SimpleApp.java

 

public class SimpleApp { 

    //Run the producer first, then the consumer 
    public static void main(String[] args) throws Exception { 
     runInNewthread(new JMSProducer()); 
     runInNewthread(new JMSConsumer()); 
    } 

    public static void runInNewthread(Runnable runnable) { 
     Thread brokerThread = new Thread(runnable); 
     brokerThread.setDaemon(false); 
     brokerThread.start(); 
    } 

} 

現在るSimpleApp.java類。

所有da最好。快樂的編碼。

0

這裏對ActiveMQ和Apache Camel進行簡單的junit測試。這兩項技術在一起工作非常好。

如果您想了解的代碼的詳細信息,你可以找到我的博客的一篇文章:

http://ignaciosuay.com/unit-testing-active-mq/

public class ActiveMQTest extends CamelTestSupport { 

    @Override 
    protected CamelContext createCamelContext() throws Exception { 
     CamelContext camelContext = super.createCamelContext(); 

     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
     camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory)); 

     return camelContext; 
    } 

    @Override 
    protected RouteBuilder createRouteBuilder() throws Exception { 
     return new RouteBuilder() { 

      @Override 
      public void configure() throws Exception { 

      from("mina:tcp://localhost:6666?textline=true&sync=false") 
      .to("activemq:processHL7"); 

      from("activemq:processHL7") 
       .to("mock:end"); 
      } 
     }; 
    } 

    @Test 
    public void testSendHL7Message() throws Exception { 
     MockEndpoint mock = getMockEndpoint("mock:end"); 

     String m = "MSH|^~\\&|hl7Integration|hl7Integration|||||ADT^A01|||2.5|\r" + 
       "EVN|A01|20130617154644\r" + 
       "PID|1|465 306 5961||407623|Wood^Patrick^^^MR||19700101|1|\r" + 
       "PV1|1||Location||||||||||||||||261938_6_201306171546|||||||||||||||||||||||||20130617134644|"; 

     mock.expectedBodiesReceived(m); 

     template.sendBody("mina:tcp://localhost:6666?textline=true&sync=false", m); 

     mock.assertIsSatisfied(); 
    }