2016-11-08 25 views
0

在服務關閉後是否可以重新連接到MSMQ,而無需重新啓動我的應用程序?如果是那麼如何?如何在C#中重新連接到MSMQ(如果服務關閉一段時間)

我的代碼是:

void tmrDelay_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
    { 
     tmrDelay.Stop(); 

     while (isConnected) 
     { 
      try 
      { 
       if (ImportFromMSMQ.HasMessages()) 
       { 
        MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ started at {0}", DateTime.Now.ToString())); 
        while (ImportFromMSMQ.HasMessages()) 
        { 
         ImportFromMSMQ.Run(); 
         MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ completed at {0}", DateTime.Now.ToString())); 
        } 
       } 
       else 
       { 
        MSMQ_ServiceLog.WriteEntry(string.Format("MSMQ is empty {0}", DateTime.Now.ToString())); 
       } 
      } 
      catch (Exception ex) 
      { 
       logger.Error(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message)); 
       MSMQ_ServiceLog.WriteEntry(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message), EventLogEntryType.Error, -1); 
      } 
     } 

     tmrDelay.Start(); 
    } 

while循環,我已經添加,但尚未進行測試,因爲懷疑無限循環的東西。如果關閉MSMQ服務,我們將在catch子句中結束,當MSMQ再次啓動時,我們需要重新啓動整個應用程序。但是,如何重新連接到MSMQ並在等待重新啓動時繼續執行此操作?

public class ImportFromMSMQ 
    { 
     private static readonly ILog logger = LogManager.GetLogger(typeof(ImportFromMSMQ)); 
     //Max records per records 
     private static int _maxMessagesPerTransaction = ConstantHelper.MaxMessagesPerTransaction; 
     private static bool _isNotCompleted = true; 
     private static int counter = 0; 
     private static MessageQueue mq; 
     private static long _maxMessageBodySizeLimitInBytes = ConstantHelper.MaxMessageBodySizeLimitInBytes; 

    // Start import 
    public static void Run() 
    { 
     Start(); 
    } 

    public static bool HasMessages() 
    { 
     var _mqName = ConstantHelper.SourceQueue; 
     mq = new System.Messaging.MessageQueue(_mqName); 
     long _totalmsg = MessagesCounter.GetMessageCount(mq); 

     if (_totalmsg > 0) 
     { 
      logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName)); 
     } 
     else 
     { 
      logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName)); 
     } 

     return _totalmsg > 0; 
    } 


    private static void Start() 
    { 

     logger.Info(string.Format("Data transfer starting at {0}", DateTime.Now.ToString())); 
     long _currentMessageBodySizeLimitInBytes = 0; 
     ArrayList messageList = new ArrayList(); 
     System.Messaging.Message mes; 
     System.IO.Stream bodyStream = null; 
     // Create a transaction. 
     MessageQueueTransaction trans = new MessageQueueTransaction(); 
     List<ConventionalData> objectList = new List<ConventionalData>(); 
     IMessageContainer _container = new MessageContainer(); 
     // Begin the transaction. 
     trans.Begin(); 
     try 
     { 
      while (_isNotCompleted && counter < _maxMessagesPerTransaction) 
      { 
       try 
       { 
        counter++; 
        mes = mq.Receive(new TimeSpan(0, 0, 3), trans); 
        if (mes != null) 
        { 
         bodyStream = mes.BodyStream; 
         _currentMessageBodySizeLimitInBytes = _currentMessageBodySizeLimitInBytes + bodyStream.Length; 
        } 

        VisionAir.Messaging.ConventionalData data = ProtoBuf.Serializer.Deserialize<ConventionalData>(mes.BodyStream); 
        objectList.Add(data); 

        _isNotCompleted = _currentMessageBodySizeLimitInBytes <= _maxMessageBodySizeLimitInBytes; 
       } 
       catch 
       { 
        _isNotCompleted = false; 
       } 
      } 

      if (objectList.Count != 0) 
      { 
       logger.Info(string.Format("Starting transfer of {0} messages", objectList.Count)); 
       _container.MQMessages = objectList; 
       ExportToActiveMQ _export = new ExportToActiveMQ(_container, (ExportOption) Enum.Parse(typeof(ExportOption),ConstantHelper.ExportFormat)); 
       _export.Export(); 
      } 

      logger.Info(string.Format("Transfer of {0} messages is completed at {1}",objectList.Count, DateTime.Now.ToString())); 

      // Commit transaction    
      trans.Commit(); 

      counter = 0; //ResetF 
      _isNotCompleted = true; //Reset 
     } 
     catch (Exception ex) 
     { 
      logger.Error(ex); 
      // Roll back the transaction. 
      trans.Abort(); 
      counter = 0; 
     } 
    } 
} 

我已經看過這個問題Service not receiving messages after Message Queuing service restarted,但似乎並沒有完全理解,因此,如果任何人都可以在上面闡述,這將是很好。

我是否必須遞歸地調用我的tmrDelay_Elapsed()方法I catch子句或其他?

+0

基本上這個傢伙說,你不需要重用你的'私人靜態MessageQueue mq';而是每次創建一個新的,因爲它很容易處理該場景 – Hackerman

+0

好的,所以我只需要創建一個「無限循環」,如果需要,可以創建一個退出參數。然後爲每個消息讀一個新的MessageQueue? – TheLearner

回答

1

我找到了解決辦法,這是簡單的,當我第一次意識到,我們第一次訪問隊列處於HasMessages()的方法,所以我把它改成如下:

public static bool HasMessages() 
     { 
      var _mqName = ConstantHelper.SourceQueue; 
      long _totalmsg = 0; 
      int countAttempts = 0; 

     queueExists = false; 
     while (!queueExists) 
     { 
      if (MessageQueue.Exists(_mqName)) 
      { 
       queueExists = true; 

       mq = new System.Messaging.MessageQueue(_mqName); 
       _totalmsg = MessagesCounter.GetMessageCount(mq); 

       if (_totalmsg > 0) 
       { 
        logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName)); 
       } 
       else 
       { 
        logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName)); 
       } 
      } 
      else 
      { 
       logger.Info(string.Format("No Queue named {0} found, trying again.", _mqName)); 
       countAttempts++; 
       if(countAttempts % 10 == 0) 
       { 
        logger.Info(string.Format("Still no queue found, have you checked Services that Message Queuing(Micrsoft Message Queue aka. MSMQ) is up and running")); 
       } 
       Thread.Sleep(5000); 
      } 
     } 

     return _totalmsg > 0; 
    } 

我檢查MessageQueue.Exists並繼續嘗試它,如果它不存在,直到我找到隊列。當我找到它時,我將queueExists設置爲true,打破while循環,繼續處理收到的數據(如果收到任何數據)。我在某篇文章中看到,使用Thread.Sleep(n)是不好的,但現在我正在使用這個解決方案。

相關問題