2008-09-29 143 views
9

JMS消息接收器由JMSCorrelationID過濾

如何在Java(JRE/JDK/J2EE 1.4)中實例化JMS隊列偵聽器,該偵聽器只接收與給定的JMSCorrelationID匹配的消息?我正在尋找的信息已發佈到隊列中,而不是主題,但如果需要,這些信息可能會更改。

下面是我目前使用把消息隊列代碼:

/** 
* publishResponseToQueue publishes Requests to the Queue. 
* 
* @param jmsQueueFactory    -Name of the queue-connection-factory 
* @param jmsQueue     -The queue name for the request 
* @param response      -A response object that needs to be published 
* 
* @throws ServiceLocatorException  -An exception if a request message 
*          could not be published to the Topic 
*/ 
private void publishResponseToQueue(String jmsQueueFactory, 
            String jmsQueue, 
            Response response) 
     throws ServiceLocatorException { 

    if (logger.isInfoEnabled()) { 
     logger.info("Begin publishRequestToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + "," + response); 
    } 
    logger.assertLog(jmsQueue != null && !jmsQueue.equals(""), 
         "jmsQueue cannot be null"); 
    logger.assertLog(jmsQueueFactory != null && !jmsQueueFactory.equals(""), 
         "jmsQueueFactory cannot be null"); 
    logger.assertLog(response != null, "Request cannot be null"); 

    try { 

     Queue queue = (Queue)_context.lookup(jmsQueue); 

     QueueConnectionFactory factory = (QueueConnectionFactory) 
      _context.lookup(jmsQueueFactory); 

     QueueConnection connection = factory.createQueueConnection(); 
     connection.start(); 
     QueueSession session = connection.createQueueSession(false, 
            QueueSession.AUTO_ACKNOWLEDGE); 

     ObjectMessage objectMessage = session.createObjectMessage(); 

     objectMessage.setJMSCorrelationID(response.getID()); 

     objectMessage.setObject(response); 

     session.createSender(queue).send(objectMessage); 

     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     //XC3.2 Added/Modified BEGIN 
     logger.error("ServiceLocator.publishResponseToQueue - Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     throw new ServiceLocatorException("ServiceLocator.publishResponseToQueue " + 
              "- Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     //XC3.2 Added/Modified END 
    } 

    if (logger.isInfoEnabled()) { 
     logger.info("End publishResponseToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + response); 
    } 

} // end of publishResponseToQueue method 

回答

10

隊列連接設置是一樣的,但一旦你有QueueSession的,你創建一個接收器時所設置的選擇。

QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'"); 

然後

receiver.receive() 

receiver.setListener(myListener); 
+0

我最近一直在閱讀上的同一主題,並有一個問題如下:將接收器仍然收到不包含所需的相關ID的消息,並自動刪除它們W/O處理,還是會JMS提供程序本身不會將這些消息傳遞給接收方,以便它們仍然在隊列中?我覺得後者是正確的做法,但要驗證。謝謝。 – shrini1000 2011-08-03 04:41:42

+0

@ shrini1000你是對的。 – Trying 2013-10-21 22:21:15

5

BTW,而它不是你問的問題,實際的 - 如果你想實現通過JMS請求響應我建議reading this article作爲JMS API比你想象的要複雜得多,而且高效地做這件事比看起來要困難得多。

特別to use JMS efficiently,你應該儘量避免單個郵件等

也造成消費者因爲JMS API是如此的非常複雜的正確和有效地使用 - 特別是與池,事務和並行處理 - 我建議人hide the middleware from their application code諸如經由使用Apache Camel's Spring Remoting implementation for JMS

0
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver receiver = session.createReceiver(queue, filter); 

在這裏,接收機將獲得其JMSCorrelationID等於MessageID的消息。這在請求/響應範例中非常有用。

,或者你可以直接將其設置爲任意值:

QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";); 

比你可以做任何receiver.receive(2000);receiver.setMessageListener(this);

2

希望這會有所幫助。我使用了Open MQ。

package com.MQueues; 

import java.util.UUID; 

import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import com.sun.messaging.BasicQueue; 
import com.sun.messaging.QueueConnectionFactory; 

public class HelloProducerConsumer { 

public static String queueName = "queue0"; 
public static String correlationId; 

public static String getCorrelationId() { 
    return correlationId; 
} 

public static void setCorrelationId(String correlationId) { 
    HelloProducerConsumer.correlationId = correlationId; 
} 

public static String getQueueName() { 
    return queueName; 
} 

public static void sendMessage(String threadName) { 
    correlationId = UUID.randomUUID().toString(); 
    try { 

     // Start connection 
     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
     BasicQueue destination = (BasicQueue) session.createQueue(threadName); 
     MessageProducer producer = session.createProducer(destination); 
     connection.start(); 

     // create message to send 
     TextMessage message = session.createTextMessage(); 
     message.setJMSCorrelationID(correlationId); 
     message.setText(threadName + "(" + System.currentTimeMillis() 
       + ") " + correlationId +" from Producer"); 

     System.out.println(correlationId +" Send from Producer"); 
     producer.send(message); 

     // close everything 
     producer.close(); 
     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void receivemessage(final String correlationId) { 
    try { 

     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

     BasicQueue destination = (BasicQueue) session.createQueue(getQueueName()); 

     connection.start(); 

     System.out.println("\n"); 
     System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 
     long now = System.currentTimeMillis(); 

     // receive our message 
     String filter = "JMSCorrelationID = '" + correlationId + "'"; 
     QueueReceiver receiver = session.createReceiver(destination, filter); 
     TextMessage m = (TextMessage) receiver.receive(); 
     System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp()); 

     System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 

     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void main(String args[]) { 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId1 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId2 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId3 = getCorrelationId(); 


    HelloProducerConsumer.receivemessage(correlationId2); 

    HelloProducerConsumer.receivemessage(correlationId1); 

    HelloProducerConsumer.receivemessage(correlationId3); 
} 
}