2010-12-09 39 views
1

我只是爲我處理一個新的場景,我認爲這可能是一些常見的:) ..如何使用請求 - 應答模式將JMS打包到同步調用中的WebSphere MQ橋接?

根據要求,我需要建立一個用戶體驗,像一個同步在線交易Web服務調用,它使用異步JMS-MQ橋實際將調用委派給IBM MQ系列。

客戶端調用Web服務,而不是將其消息發佈到將傳遞到WebSphere MQ的App服務器上的JMS隊列中,並且處理後的響應將傳回給FIXED JMS隊列端點中的應用服務器。

需求處理此事務,如果WebSphere MQ未在規定的時間內傳遞響應,則該事務需要超時,而Web服務應向客戶端發送超時信號並忽略此事務。

該問題的草圖如下。

我需要阻止對Web服務的請求,直到響應到達或超時。

比我正在尋找一些開放的圖書館來幫助我完成這項任務。 或者唯一的解決方案是阻止一個線程並保持響應的池? 也許我可以實現一些塊與監聽器在響應到達時被通知?

有一點討論會對我現在很有幫助,試圖澄清我的想法。 有什麼建議嗎?

我有一個草圖,我希望能幫助清除畫面;)

alt text

回答

1

幾天編碼我得到了一個解決方案之後。我使用帶有JAX-WS批註和標準JMS的標準EJB3。

我爲了滿足要求所寫的代碼如下。它是一個無狀態的會話Bean,它使用bean管理的事務(BMT),因爲使用標準容器管理的事務(CMT)造成某種類型的掛起,我相信因爲我試圖將兩個JMS交互作爲它們在同一個事務中同樣的方法讓我注意到我必須爲每個與JMS隊列的交互啓動和完成事務。我正在使用weblogic來解決這個問題。我還編寫了一個基本上使用來自隊列端點jms/Pergunta的消息的MDB,並在jms/Resposta隊列上放置了一個響應消息,以此來模擬MQ問題的預期行爲。實際上,在真實場景中,我們可能會在大型機上安裝一些COBOL應用程序,甚至還有其他Java應用程序處理消息並將響應放在響應隊列中。

如果有人需要嘗試此代碼,基本上所有你需要的是有一個容器J2EE5和配置2隊列jndi名稱:jms/Pergunta和jms/Resposta。

的EJB/web服務代碼:

@Stateless 
@TransactionManagement(TransactionManagementType.BEAN) 
@WebService(name="DJOWebService") 
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal { 

    Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName()); 

    @Resource 
    SessionContext ejbContext; 

    // Defines the JMS connection factory. 
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory"; 

    // Defines request queue 
    public final static String QUEUE_PERG = "jms/Pergunta"; 

    // Defines response queue 
    public final static String QUEUE_RESP = "jms/Resposta"; 


    Context ctx; 
    QueueConnectionFactory qconFactory; 

    /** 
    * Default constructor. 
    */ 
    public DJOSessionBeanWS() { 
     log.info("Construtor DJOSessionBeanWS"); 
    } 

    @WebMethod(operationName = "processaMensagem") 
    public String processaMensagem(String mensagemEntrada, String idUnica) 
    { 
     //gets UserTransaction reference as this is a BMT EJB. 
     UserTransaction ut = ejbContext.getUserTransaction(); 
     try { 

      ctx = new InitialContext(); 
      //get the factory before any transaction it is a weblogic resource. 
      qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); 
      log.info("Got QueueConnectionFactory"); 
      ut.begin(); 
      QueueConnection qcon = qconFactory.createQueueConnection(); 
      QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta")); 
      TextMessage message = qsession.createTextMessage("this is a request message"); 
      message.setJMSCorrelationID(idUnica); 
      qsession.createSender(qs).send(message); 
      ut.commit(); 
      qcon.close(); 
      //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required 
      ut.begin(); 
      QueueConnection queuecon = qconFactory.createQueueConnection(); 
      Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta")); 
      QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      String messageSelector = "JMSCorrelationID = '" + idUnica + "'"; 
      //creates que receiver and sets a message selector to get only related message from the response queue. 
        QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector); 
      queuecon.start(); 
      //sets the timeout to keep waiting for the response... 
      TextMessage tresposta = (TextMessage) qr.receive(10000); 
      if(tresposta != null) 
      { 
       ut.commit(); 
       queuecon.close(); 
       return(tresposta.toString()); 
      } 
      else{ 
       //commints anyway.. does not have a response though 
       ut.commit(); 
       queuecon.close(); 
       log.info("null reply, returned by timeout.."); 
       return "Got no reponse message."; 
      } 



     } catch (Exception e) { 
      log.severe("Unexpected error occurred ==>> " + e.getMessage()); 
      e.printStackTrace(); 
      try { 
       ut.commit(); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
      return "Error committing transaction after some other error executing ==> " + e.getMessage(); 
     } 

    } 
} 

這是對於其中嘲笑此問題的MQ側的MDB的代碼。在我的測試過程中,我有一個Thread.sleep片段來模擬和測試客戶端的超時以驗證解決方案,但它不存在於此版本中。

/** 
* Mock to get message from request queue and publish a new one on the response queue. 
*/ 
@MessageDriven(
     activationConfig = { @ActivationConfigProperty(
       propertyName = "destinationType", propertyValue = "javax.jms.Queue" 
     ) }, 
     mappedName = "jms/Pergunta") 
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener { 

    Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName()); 

    // Defines the JMS connection factory. 
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory"; 

    // Define Queue de resposta 
    public final static String QUEUE_RESP = "jms/Resposta"; 


    Context ctx; 
    QueueConnectionFactory qconFactory; 



    /** 
    * Default constructor. 
    */ 
    public ConsomePerguntaPublicaRespostaMDB() { 
     log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB"); 
     try { 
      ctx = new InitialContext(); 
     } catch (NamingException e) { 
      e.printStackTrace(); 
     } 
    } 

    /** 
    * @see MessageListener#onMessage(Message) 
    */ 
    public void onMessage(Message message) { 
     log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage"); 
     TextMessage tm = (TextMessage) message; 

     try { 
      log.info("Mensagem recebida no onMessage ==>> " + tm.getText()); 

      //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta. 
      String idMensagem = tm.getJMSCorrelationID(); 
      log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem); 

      qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); 
      log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem"); 
      QueueConnection qcon = qconFactory.createQueueConnection(); 
      QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      Queue queue = (Queue) (ctx.lookup("jms/Resposta")); 
      TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta..."); 
      tmessage.setJMSCorrelationID(idMensagem); 
      qsession.createSender(queue).send(tmessage); 
     } catch (JMSException e) { 
      log.severe("Erro no onMessage ==>> " + e.getMessage()); 
      e.printStackTrace(); 
     } catch (NamingException e) { 
      log.severe("Erro no lookup ==>> " + e.getMessage()); 
      e.printStackTrace(); 
     } 

    } 

} 

[]中

2

嘿,感謝張貼自己的解決方案!

是的,接收()超時是在這種情況下最優雅的方式。

請注意,由於超時而未讀取的消息會發生什麼情況。如果你的客戶重新使用相同的隊列,他可能會收到一條陳舊的消息。

確保及時刪除超時消息(如果沒有其他原因,則不要用未處理的消息填充隊列)。

您可以通過代碼(在消息生產者上設置生存時間)或Websphere MQ服務器(使用使消息自動過期的隊列)輕鬆完成此操作。

如果您不能/不想修改代碼的MQ端,後者更容易。這是我會做:)

+0

是的,我們有一個錯誤隊列,其中超時消息轉發和另一段代碼處理它們。 – 2011-01-11 13:02:44

相關問題