幾天編碼我得到了一個解決方案之後。我使用帶有JAX-WS批註和標準JMS的標準EJB3。
我爲了滿足要求所寫的代碼如下。它是一個無狀態的會話Bean,它使用bean管理的事務(BMT),因爲使用標準容器管理的事務(CMT)造成某種類型的掛起,我相信因爲我試圖將兩個JMS交互作爲它們在同一個事務中同樣的方法讓我注意到我必須爲每個與JMS隊列的交互啓動和完成事務。我正在使用weblogic來解決這個問題。我還編寫了一個基本上使用來自隊列端點jms/Pergunta的消息的MDB,並在jms/Resposta隊列上放置了一個響應消息,以此來模擬MQ問題的預期行爲。實際上,在真實場景中,我們可能會在大型機上安裝一些COBOL應用程序,甚至還有其他Java應用程序處理消息並將響應放在響應隊列中。
如果有人需要嘗試此代碼,基本上所有你需要的是有一個容器J2EE5和配置2隊列jndi名稱:jms/Pergunta和jms/Resposta。
的EJB/web服務代碼:
@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(name="DJOWebService")
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {
Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());
@Resource
SessionContext ejbContext;
// Defines the JMS connection factory.
public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
// Defines request queue
public final static String QUEUE_PERG = "jms/Pergunta";
// Defines response queue
public final static String QUEUE_RESP = "jms/Resposta";
Context ctx;
QueueConnectionFactory qconFactory;
/**
* Default constructor.
*/
public DJOSessionBeanWS() {
log.info("Construtor DJOSessionBeanWS");
}
@WebMethod(operationName = "processaMensagem")
public String processaMensagem(String mensagemEntrada, String idUnica)
{
//gets UserTransaction reference as this is a BMT EJB.
UserTransaction ut = ejbContext.getUserTransaction();
try {
ctx = new InitialContext();
//get the factory before any transaction it is a weblogic resource.
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
log.info("Got QueueConnectionFactory");
ut.begin();
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
TextMessage message = qsession.createTextMessage("this is a request message");
message.setJMSCorrelationID(idUnica);
qsession.createSender(qs).send(message);
ut.commit();
qcon.close();
//had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
ut.begin();
QueueConnection queuecon = qconFactory.createQueueConnection();
Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
//creates que receiver and sets a message selector to get only related message from the response queue.
QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
queuecon.start();
//sets the timeout to keep waiting for the response...
TextMessage tresposta = (TextMessage) qr.receive(10000);
if(tresposta != null)
{
ut.commit();
queuecon.close();
return(tresposta.toString());
}
else{
//commints anyway.. does not have a response though
ut.commit();
queuecon.close();
log.info("null reply, returned by timeout..");
return "Got no reponse message.";
}
} catch (Exception e) {
log.severe("Unexpected error occurred ==>> " + e.getMessage());
e.printStackTrace();
try {
ut.commit();
} catch (Exception ex) {
ex.printStackTrace();
}
return "Error committing transaction after some other error executing ==> " + e.getMessage();
}
}
}
這是對於其中嘲笑此問題的MQ側的MDB的代碼。在我的測試過程中,我有一個Thread.sleep片段來模擬和測試客戶端的超時以驗證解決方案,但它不存在於此版本中。
/**
* Mock to get message from request queue and publish a new one on the response queue.
*/
@MessageDriven(
activationConfig = { @ActivationConfigProperty(
propertyName = "destinationType", propertyValue = "javax.jms.Queue"
) },
mappedName = "jms/Pergunta")
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {
Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());
// Defines the JMS connection factory.
public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
// Define Queue de resposta
public final static String QUEUE_RESP = "jms/Resposta";
Context ctx;
QueueConnectionFactory qconFactory;
/**
* Default constructor.
*/
public ConsomePerguntaPublicaRespostaMDB() {
log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
try {
ctx = new InitialContext();
} catch (NamingException e) {
e.printStackTrace();
}
}
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
TextMessage tm = (TextMessage) message;
try {
log.info("Mensagem recebida no onMessage ==>> " + tm.getText());
//pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
String idMensagem = tm.getJMSCorrelationID();
log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
tmessage.setJMSCorrelationID(idMensagem);
qsession.createSender(queue).send(tmessage);
} catch (JMSException e) {
log.severe("Erro no onMessage ==>> " + e.getMessage());
e.printStackTrace();
} catch (NamingException e) {
log.severe("Erro no lookup ==>> " + e.getMessage());
e.printStackTrace();
}
}
}
[]中
是的,我們有一個錯誤隊列,其中超時消息轉發和另一段代碼處理它們。 – 2011-01-11 13:02:44