2011-10-06 83 views
0

我一直在擺弄JMS虛擬主題。我想要得到的例子,但它有一些問題。配置虛擬主題:JMS - Java

我採用了ActiveMQ隨附的示例並使用虛擬主題對其進行了修改。

的TopicPublisher:

/** 
* Licensed to the Apache Software Foundation (ASF) under one or more 
* contributor license agreements. See the NOTICE file distributed with 
* this work for additional information regarding copyright ownership. 
* The ASF licenses this file to You under the Apache License, Version 2.0 
* (the "License"); you may not use this file except in compliance with 
* the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 
import java.util.Arrays; 

import javax.jms.BytesMessage; 
import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnectionFactory; 

/** 
* Use in conjunction with TopicListener to test the performance of ActiveMQ 
* Topics. 
*/ 
public class TopicPublisher implements MessageListener { 

    private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); 

    private final Object mutex = new Object(); 
    private Connection connection; 
    private Session session; 
    private MessageProducer publisher; 
    private Topic topic; 
    private Topic control; 

    private String url = "tcp://localhost:61616"; 
    private int size = 256; 
    private int subscribers = 1; 
    private int remaining; 
    private int messages = 10000; 
    private long delay; 
    private int batch = 2000; 

    private byte[] payload; 

    public static void main(String[] argv) throws Exception { 
     TopicPublisher p = new TopicPublisher(); 
     String[] unknown = CommandLineSupport.setOptions(p, argv); 
     if (unknown.length > 0) { 
      System.out.println("Unknown options: " + Arrays.toString(unknown)); 
      System.exit(-1); 
     } 
     p.run(); 
    } 

    private void run() throws Exception { 
     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 
     connection = factory.createConnection(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     topic = session.createTopic("VirtualTopic.Orders"); 
     control = session.createTopic("VirtualTopic.control"); 

     publisher = session.createProducer(topic); 
     publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

     payload = new byte[size]; 
     for (int i = 0; i < size; i++) { 
      payload[i] = (byte)DATA[i % DATA.length]; 
     } 

     session.createConsumer(control).setMessageListener(this); 
     connection.start(); 

     long[] times = new long[batch]; 
     for (int i = 0; i < batch; i++) { 
      if (i > 0) { 
       Thread.sleep(delay * 1000); 
      } 
      times[i] = batch(messages); 
      System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms."); 
     } 

     long min = min(times); 
     long max = max(times); 
     System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); 

     // request shutdown 
     publisher.send(session.createTextMessage("SHUTDOWN")); 

     connection.stop(); 
     connection.close(); 
    } 

    private long batch(int msgCount) throws Exception { 
     long start = System.currentTimeMillis(); 
     remaining = subscribers; 
     publish(); 
     waitForCompletion(); 
     return System.currentTimeMillis() - start; 
    } 

    private void publish() throws Exception { 

     // send events 
     BytesMessage msg = session.createBytesMessage(); 
     msg.writeBytes(payload); 
     for (int i = 0; i < messages; i++) { 
      publisher.send(msg); 
      if ((i + 1) % 1000 == 0) { 
       System.out.println("Sent " + (i + 1) + " messages"); 
      } 
     } 

     // request report 
     publisher.send(session.createTextMessage("REPORT")); 
    } 

    private void waitForCompletion() throws Exception { 
     System.out.println("Waiting for completion..."); 
     synchronized (mutex) { 
      while (remaining > 0) { 
       mutex.wait(); 
      } 
     } 
    } 

    public void onMessage(Message message) { 
     synchronized (mutex) { 
      System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining"); 
      if (remaining == 0) { 
       mutex.notify(); 
      } 
     } 
    } 

    Object getReport(Message m) { 
     try { 
      return ((TextMessage)m).getText(); 
     } catch (JMSException e) { 
      e.printStackTrace(System.out); 
      return e.toString(); 
     } 
    } 

    static long min(long[] times) { 
     long min = times.length > 0 ? times[0] : 0; 
     for (int i = 0; i < times.length; i++) { 
      min = Math.min(min, times[i]); 
     } 
     return min; 
    } 

    static long max(long[] times) { 
     long max = times.length > 0 ? times[0] : 0; 
     for (int i = 0; i < times.length; i++) { 
      max = Math.max(max, times[i]); 
     } 
     return max; 
    } 

    static long avg(long[] times, long min, long max) { 
     long sum = 0; 
     for (int i = 0; i < times.length; i++) { 
      sum += times[i]; 
     } 
     sum -= min; 
     sum -= max; 
     return sum/times.length - 2; 
    } 

    public void setBatch(int batch) { 
     this.batch = batch; 
    } 

    public void setDelay(long delay) { 
     this.delay = delay; 
    } 

    public void setMessages(int messages) { 
     this.messages = messages; 
    } 

    public void setSize(int size) { 
     this.size = size; 
    } 

    public void setSubscribers(int subscribers) { 
     this.subscribers = subscribers; 
    } 

    public void setUrl(String url) { 
     this.url = url; 
    } 
} 

TopicListener:

/** 
* Licensed to the Apache Software Foundation (ASF) under one or more 
* contributor license agreements. See the NOTICE file distributed with 
* this work for additional information regarding copyright ownership. 
* The ASF licenses this file to You under the Apache License, Version 2.0 
* (the "License"); you may not use this file except in compliance with 
* the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 
import java.util.Arrays; 

import javax.jms.Connection; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnectionFactory; 

/** 
* Use in conjunction with TopicPublisher to test the performance of ActiveMQ 
* Topics. 
*/ 
public class TopicListener implements MessageListener { 

    private Connection connection; 
    private MessageProducer producer; 
    private Session session; 
    private int count; 
    private long start; 
    private Topic topic; 
    private Topic control; 

    private String url = "tcp://localhost:61616"; 

    public static void main(String[] argv) throws Exception { 
     TopicListener l = new TopicListener(); 
     String[] unknown = CommandLineSupport.setOptions(l, argv); 
     if (unknown.length > 0) { 
      System.out.println("Unknown options: " + Arrays.toString(unknown)); 
      System.exit(-1); 
     } 
     l.run(); 
    } 

    public void run() throws JMSException { 
     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 
     connection = factory.createConnection(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     topic = session.createTopic("Consumer.A.VirtualTopic.Orders"); 
     control = session.createTopic("Consumer.A.VirtualTopic.control"); 

     MessageConsumer consumer = session.createConsumer(topic); 
     consumer.setMessageListener(this); 

     connection.start(); 

     producer = session.createProducer(control); 
     System.out.println("Waiting for messages..."); 
    } 

    private static boolean checkText(Message m, String s) { 
     try { 
      return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); 
     } catch (JMSException e) { 
      e.printStackTrace(System.out); 
      return false; 
     } 
    } 

    public void onMessage(Message message) { 
     if (checkText(message, "SHUTDOWN")) { 

      try { 
       connection.close(); 
      } catch (Exception e) { 
       e.printStackTrace(System.out); 
      } 

     } else if (checkText(message, "REPORT")) { 
      // send a report: 
      try { 
       long time = System.currentTimeMillis() - start; 
       String msg = "Received " + count + " in " + time + "ms"; 
       producer.send(session.createTextMessage(msg)); 
      } catch (Exception e) { 
       e.printStackTrace(System.out); 
      } 
      count = 0; 

     } else { 

      if (count == 0) { 
       start = System.currentTimeMillis(); 
      } 

      if (++count % 1000 == 0) { 
       System.out.println("Received " + count + " messages."); 
      } 
     } 
    } 

    public void setUrl(String url) { 
     this.url = url; 
    } 

} 

主題瀏覽顯示此:

<topic name="VirtualTopic.Orders"><stats size="0" consumerCount="0" enqueueCount="2001" dequeueCount="0"/></topic> 


<topic name="Consumer.A.VirtualTopic.Orders"><stats size="0" consumerCount="1" enqueueCount="0" dequeueCount="0"/></topic> 

的消息沒有被監聽所消費的所有。在監聽器中,如果我將主題名稱指定爲VirtualTopic.Orders,那麼偵聽器能夠獲取消息。

我在做什麼錯在這裏?

回答

2

您的訂閱已設置爲名爲「Consumer.A.VirtualTopic.Orders」的主題。訂閱應該是一個隊列。

+0

我會盡力而爲,讓你儘快知道。 –

+0

雖然沒有解決它。我想知道是否需要對activemq.xml –

+0

Jake進行任何更改,您說得對,但我必須對activemq.xml進行一次更改。我認爲virtualtopic是一個開箱即用的功能。但是,我必須將其添加到activemq.xml並更改我的消費者主題。 」prefix =「VirtualTopicConsumers。*。」 selectorAware =「false」/>