2017-04-21 82 views
0

怎麼可能使用ActiveMQ的持久連接/會話入隊的消息不同的隊列?ActiveMQ的生產者多隊列中的一個會議

我做了什麼:

public class ActiveMQProducer { 

    private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer.class); 
    private Connection connection; 
    private MessageProducer producer; 
    private Session session; 
    String activeMQConnection; 

    public ActiveMQProducer() throws ConfigurationException, JMSException { 
     activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url"); 
    } 

    public void setupActiveMQ(String queueName) throws JMSException { 

     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection); 
     factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 

     connection = factory.createConnection(); 
     connection.start(); 

     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = session.createQueue(queueName); 
     producer = session.createProducer(queue); 

    } 

    public void getConnection(String queueName) throws JMSException { 

     if (connection == null || session == null) { 
      Object object = new Object(); 
      synchronized (object) { 
       setupActiveMQ(queueName); 
      } 
     } 
    } 

    public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException { 
     getConnection(queueName); 
     ObjectMessage message = session.createObjectMessage(t); 
     producer.send(message); 
     return null; 
    } 

    public void sendMessageToActiveMQ(String queueName, String message) throws JMSException { 
     getConnection(queueName); 
     TextMessage toSend = session.createTextMessage(message); 
     producer.send(toSend); 
    } 
} 

我以這一點,併發送消息到不同的隊列實現最終的ActiveMQ用完的連接,因爲我從來沒有關閉連接或會話:

org.apache.activemq.transport.tcp.ExceededMaximumConnectionsException: Exceeded the maximum number of allowed client connections. 

什麼是處理這個正確的方法是什麼?我有大約5隊列我有不同的消息發送到,我應該打開一個新的連接,入隊和關閉連接,反正是有保持會話/連接執着?

謝謝。

回答

2

這裏的一些解決方案:每個目的地

1監製:

import java.io.Serializable; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ThreadPoolExecutor; 

import javax.jms.Connection; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.ObjectMessage; 
import javax.jms.Queue; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ConfigurationException; 
import org.apache.log4j.Logger; 

public class ActiveMQProducer { 

    private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer.class); 
    private Connection connection; 
    private Session session; 
    String activeMQConnection; 
    Map<String, MessageProducer> producers = Collections.synchronizedMap(new HashMap<String, MessageProducer>()); 
    Thread shutdownHook = new Thread(new Runnable() { 
     @Override 
     public void run() { 
      close(); 
     } 
    }); 

    public ActiveMQProducer() throws ConfigurationException, JMSException { 
     activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url"); 
     setupActiveMQ(); 
     Runtime.getRuntime().addShutdownHook(shutdownHook); 
    } 

    public void setupActiveMQ() throws JMSException { 
     close(); 
     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection); 
     factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
     connection = factory.createConnection(); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    } 

    @Override 
    protected void finalize() throws Throwable { 
     close(); 
     super.finalize(); 
    } 

    public void close() { 
     if (connection != null) { 
      try { 
       connection.close(); 
      } catch (Exception e) { 
      } 
      connection = null; 
     } 
    } 

    public void getConnection() throws JMSException { 
     if (connection == null || session == null) { 
      setupActiveMQ(); 
     } 
    } 

    public MessageProducer getProducer(String queueName) throws JMSException { 
     getConnection(); 
     MessageProducer producer = producers.get(queueName); 
     if (producer == null) { 
      Queue queue = session.createQueue(queueName); 
      producer = session.createProducer(queue); 
      producers.put(queueName, producer); 
     } 
     return producer; 
    } 

    public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException { 
     MessageProducer producer = getProducer(queueName); 
     ObjectMessage message = session.createObjectMessage(t); 
     producer.send(message); 
     return null; 
    } 

    public void sendMessageToActiveMQ(String queueName, String message) throws JMSException { 
     MessageProducer producer = getProducer(queueName); 
     TextMessage toSend = session.createTextMessage(message); 
     producer.send(toSend); 
    } 
} 

1生產者對所有的目的地:

import java.io.Serializable; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ThreadPoolExecutor; 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ConfigurationException; 
import org.apache.log4j.Logger; 

public class ActiveMQProducer2 { 

    private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer2.class); 
    private Connection connection; 
    private Session session; 
    String activeMQConnection; 
    Map<String, Destination> destinations = Collections.synchronizedMap(new HashMap<String, Destination>()); 
    private MessageProducer producer; 
    Thread shutdownHook = new Thread(new Runnable() { 
     @Override 
     public void run() { 
      close(); 
     } 
    }); 

    public ActiveMQProducer2() throws ConfigurationException, JMSException { 
     activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url"); 
     setupActiveMQ(); 
     Runtime.getRuntime().addShutdownHook(shutdownHook); 
    } 

    public void setupActiveMQ() throws JMSException { 
     close(); 
     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection); 
     factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
     connection = factory.createConnection(); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     producer = session.createProducer(session.createTemporaryQueue()); 
    } 

    @Override 
    protected void finalize() throws Throwable { 
     close(); 
     super.finalize(); 
    } 

    public void close() { 
     if (connection != null) { 
      try { 
       connection.close(); 
      } catch (Exception e) { 
      } 
      connection = null; 
     } 
    } 

    public void getConnection() throws JMSException { 
     if (connection == null || session == null) { 
      setupActiveMQ(); 
     } 
    } 

    public Destination getDestination(String queueName) throws JMSException { 
     getConnection(); 
     Destination destination = destinations.get(queueName); 
     if (destination == null) { 
      destination = session.createQueue(queueName); 
      destinations.put(queueName, destination); 
     } 
     return destination; 
    } 

    public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException { 
     Destination destination = getDestination(queueName); 
     ObjectMessage message = session.createObjectMessage(t); 
     producer.send(destination, message); 
     return null; 
    } 

    public void sendMessageToActiveMQ(String queueName, String message) throws JMSException { 
     Destination destination = getDestination(queueName); 
     TextMessage toSend = session.createTextMessage(message); 
     producer.send(destination, toSend); 
    } 
} 
+0

神奇的解決方案。謝謝。 – Asiriel

相關問題