2017-03-09 121 views
0

,我使用wso2esb對它進行了配置以進行消息處理。使用Active MQ 5.10版本的活動MQ tcp連接失敗

大約7-10天后,活動MQ正在拋出tcp連接失敗異常,因爲ESB未收到成功的tcp連接,因此未能在隊列中提交消息。

在這種情況下,我重新啓動服務器,並再次運行7-10天,同樣的事情重複。

我的問題是

什麼可能是積極的MQ的確切原因停止給成功的TCP連接..?

爲什麼重啓服務器後它恢復正常狀態..?

有沒有最好的解決方案,以在來這個問題..

的內存配置,activemq.xml中文件

<systemUsage> 
    <systemUsage sendFailIfNoSpace="true"> 
     <memoryUsage> 
       <memoryUsage limit="1430 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="300 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="100 gb"/> 
      </tempUsage> 
    </systemUsage> 
</systemUsage> 

這裏是一個被放置在消息隊列中的代理服務。在代理服務,它會檢查用戶認證是否是真的,用戶可以在隊列中的位置信息,在這裏我使用的是類中介那裏,因爲它與主動MQ連接,並把消息

<proxy xmlns="http://ws.apache.org/ns/synapse" 
     name="JmsStore2.0" 
     transports="https http" 
     startOnLoad="true" 
     trace="disable" 
     statistics="enable"> 
    <description/> 
    <target> 
     <inSequence onError="fault"> 
     <property name="messageType" value="application/json" scope="axis2"/> 
     <property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/> 
     <property name="jmsuri" value="tcp://0.0.0.0:61616"/> 
     <property name="jmsqueue" expression="get-property('transport', 'jmsqueue')"/> 
     <property name="readingspayload" expression="$body" type="OM"/> 
     <property name="username" expression="get-property('transport', 'username')"/> 
     <property name="password" expression="get-property('transport', 'password')"/> 
     <property name="PartyBranchID" 
        expression="//FieldValue/text()" 
        scope="default" 
        type="STRING"/> 
     <property name="Body" expression="$body" scope="default" type="STRING"/> 
     <property name="usercode" 
        expression="fn:substring-before(get-property('username'),'|')" 
        scope="default" 
        type="STRING"/> 
     <property name="clientid" 
        expression="fn:substring-after(get-property('username'),'|')" 
        scope="default" 
        type="STRING"/> 
     <property name="requestMsgId" 
        expression="get-property('MessageID')" 
        scope="default" 
        type="STRING"/> 
     <property name="client_ip_address" 
        expression="get-property('axis2','REMOTE_ADDR')" 
        scope="default" 
        type="STRING"/> 

     <payloadFactory media-type="xml"> 
      <format> 
       <send xmlns=""> 
        <username>$1</username> 
        <password>$2</password> 
       </send> 
      </format> 
      <args> 
       <arg evaluator="xml" expression="get-property('username')"/> 
       <arg evaluator="xml" expression="get-property('password')"/> 
      </args> 
     </payloadFactory> 
     <send receive="JmsStore_Seq"> 
      <endpoint> 
       <address uri="http://localhost:8282/services/Login2.0" format="soap11"> 
        <suspendOnFailure> 
        <errorCodes>101500,101501,101506,101507,101508,101503,50000</errorCodes> 
        <initialDuration>30</initialDuration> 
        <progressionFactor>1.0</progressionFactor> 
        <maximumDuration>300</maximumDuration> 
        </suspendOnFailure> 
       </address> 
      </endpoint> 
     </send> 
     </inSequence> 
     <outSequence onError="fault"> 
     <send/> 
     </outSequence> 
    </target> 
</proxy> 

序列:

<sequence xmlns="http://ws.apache.org/ns/synapse" 
      name="JmsStore_Seq" 
      trace="disable"> 
    <property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="Authentication" 
      expression="//Authentication/text()"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="UserId" 
      expression="//UserId/text()" 
      scope="default" 
      type="STRING"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="WorkOUid" 
      expression="//WorkOUid/text()"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="WorkPartyBranchId" 
      expression="//WorkPartyBranchId/text()"/> 

    <filter xmlns:ns="http://org.apache.synapse/xsd" 
      xpath="get-property('Authentication')=''"> 
     <then> 
     <payloadFactory media-type="xml"> 
      <format> 
       <ResponseJSON xmlns=""> 
        <Exception>Service trying to connect inactive service</Exception> 
        <Status>101503</Status> 
       </ResponseJSON> 
      </format> 
      <args/> 
     </payloadFactory> 
     <property name="messageType" value="application/json" scope="axis2"/> 
     <property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/> 
     <property name="RESPONSE" value="true" scope="default" type="STRING"/> 
     <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
     <send/> 
     </then> 
     <else> 
     <filter xpath="get-property('Authentication')='false'"> 
      <then> 
       <payloadFactory media-type="xml"> 
        <format> 
        <ResponseJSON xmlns=""> 
         <Exception>Authentication Failed</Exception> 
         <Status>401</Status> 
        </ResponseJSON> 
        </format> 
        <args/> 
       </payloadFactory> 
       <property name="messageType" value="application/json" scope="axis2"/> 
       <property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/> 
       <property name="RESPONSE" value="true" scope="default" type="STRING"/> 
       <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
       <send/> 
      </then> 
      <else> 
       <property name="jmspayload" 
         expression="get-property('readingspayload')" 
         type="OM"/> 
       <property name="ResponseJSON" expression="$body/ResponseJSON" type="OM"/> 
       <property name="jmsuri" expression="get-property('jmsuri')"/> 
       <property name="jmsqueue" expression="get-property('jmsqueue')"/> 
       <payloadFactory media-type="xml"> 
        <format> 
        <PLData> 
         <JMpayload>$1</JMpayload> 
         <AuthData>$2</AuthData> 
         <LogData> 
          <usercode>$3</usercode> 
          <clientid>$4</clientid> 
          <requestMsgId>$5</requestMsgId> 
         </LogData> 
        </PLData> 
        </format> 
        <args> 
        <arg evaluator="xml" expression="get-property('jmspayload')"/> 
        <arg evaluator="xml" expression="get-property('ResponseJSON')"/> 
        <arg evaluator="xml" expression="get-property('usercode')"/> 
        <arg evaluator="xml" expression="get-property('clientid')"/> 
        <arg evaluator="xml" expression="get-property('requestMsgId')"/> 
        </args> 
       </payloadFactory> 
       <class name="in.youtility.esb.custommediators.JMSStoreMediator"/> 
       <payloadFactory media-type="xml"> 
        <format> 
        <ResponseJSON xmlns=""> 
         <Body> 
          <Datalist> 
           <Data>Successfully stored</Data> 
          </Datalist> 
         </Body> 
         <Status>200</Status> 
        </ResponseJSON> 
        </format> 
        <args/> 
       </payloadFactory> 
       <property name="messageType" value="application/json" scope="axis2"/> 
       <header name="To" action="remove"/> 
       <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
       <property name="RESPONSE" value="true"/> 
       <send/> 
      </else> 
     </filter> 
     </else> 
    </filter> 
    <description/> 
</sequence> 

類中保:

public class JMSStoreMediator extends AbstractMediator implements 

ManagedLifecycle {

Connection connection; 

public boolean mediate(MessageContext msgCtx) { 


    try { 
     boolean topic=false; 
     String jmsuri=""+msgCtx.getProperty("jmsuri"); 
     String t=""+msgCtx.getProperty("topic"); 

     if(t.isEmpty()){ 

      topic=false; 
     } 
     else { 

      topic=Boolean.valueOf(t); 
     } 

     ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri); 
     connection = factory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination=null; 
     if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue")); 
     else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue")); 
     MessageProducer producer = session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

     String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume(); 

     if(topic){ 

      JSONObject obj=XML.toJSONObject(xml); 
      JSONObject ar=obj.getJSONObject("soapenv:Body"); 
      ar.remove("xmlns:soapenv"); 
      xml=ar.toString(); 
     } 
     TextMessage message = session.createTextMessage(xml); 
     producer.send(message); 

    } catch (Exception e) { 

     log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()); 
     ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500); 
     handleException("Error while storing in the message store", msgCtx); 

    } 
    finally { 
     try { 
      connection.close(); 

     } catch (JMSException e) { 
      log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString()); 
     } 
    } 
     log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+ 
      ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+ 
      ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = END"); 

    return true; 
} 
+0

ActiveMQ中的消息數量日增?您如何使用AMQ,使用JMS代理服務和MessageStore? –

+0

嗨@ Jean-Michel感謝您的回覆,我正在使用ESB代理服務,它將在隊列中放置消息,並使用其他JMS代理服務來監聽此隊列並處理它。經常有消息進入隊列,無法處理任何問題,直到Active MQ成功連接到ESB。 – user4045063

+0

我發佈了第一個答案,但告訴我在ActiveMQ中,如果發生故障(在隊列中等待數千條消息),消息的數量是否增加 –

回答

0

你應該分享您的proxys的conf但你總是可以確認:

  • 你已經設置cachelevel,以避免在每次掃描新連接:<parameter name="transport.jms.CacheLevel">consumer</parameter>
  • 你問消耗所有消息每次掃描:<parameter name="transport.jms.MaxMessagesPerTask">-1</parameter>
+0

我已經分享了我的代理代碼,你有一看。 – user4045063