2015-11-14 99 views
0

這是我第一次嘗試編寫Windows服務。多線程Windows服務處理Windows Message Queue

此窗口服務必須處理2個窗口消息隊列。

每個消息隊列應該有自己的線程,但我似乎無法得到架構就地。

我跟着這個Windows Service to run constantly,它允許我創建一個線程,我正在處理一個隊列。

所以這是我的服務類:

protected override void OnStart(string[] args) 
    { 
     _thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true }; 
     _thread.Start(); 
    } 

    private void WorkerThreadFunc() 
    { 
     _addressCalculator = new GACAddressCalculator(); 

     while (!_shutdownEvent.WaitOne(0)) 
     { 
      _addressCalculator.StartAddressCalculation(); 
     } 
    } 



    protected override void OnStop() 
    { 
     _shutdownEvent.Set(); 
     if (!_thread.Join(5000)) 
     { // give the thread 5 seconds to stop 
      _thread.Abort(); 
     } 
    } 

在我GACAddressCalculator.StartAddressCalculation()我創建一個隊列處理器的對象,看起來像這樣:

public void StartAddressCalculation() 
    { 
     try 
     { 
      var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1); 
      googleQueue.ProccessMessageQueue(); 

     } 
     catch (Exception ex) 
     { 

     } 

    } 

這是GISGoogleQueue

public class GISGoogleQueue : BaseMessageQueue 
{ 


    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread) 
     : base(queueName, threadCount, logger, messagesPerThread) 
    { 
    } 

    public override void ProccessMessageQueue() 
    { 
     if (!MessageQueue.Exists(base.QueueName)) 
     { 
      _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName)); 
      return; 
     } 

     var messageQueue = new MessageQueue(QueueName); 
     var myVehMonLog = new VehMonLog(); 
     var o = new Object(); 
     var arrTypes = new Type[2]; 
     arrTypes[0] = myVehMonLog.GetType(); 
     arrTypes[1] = o.GetType(); 
     messageQueue.Formatter = new XmlMessageFormatter(arrTypes); 

     using (var pool = new Pool(ThreadCount)) 
     { 

      // Infinite loop to process all messages in Queue 
      for (; ;) 
      { 
       for (var i = 0; i < MessagesPerThread; i++) 
       { 
        try 
        { 
         while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed 


         var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins 

         if (message != null) // Check if message is not Null 
         { 
          var monLog = (VehMonLog)message.Body; 
          pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool 
         } 

        } 
        catch (Exception ex) 
        { 

        } 

       } 
      } 
     } 
    } 

}

現在這適用於1消息隊列,但如果我想處理另一個消息隊列它不會發生,因爲我在ProccessMessageQueue方法中有一個無限循環。

我想在一個單獨的線程中執行每個隊列。

我認爲我在WorkerThreadFunc()中犯了一個錯誤,我必須以某種方式從那裏啓動兩個線程或在OnStart()

此外,如果您有任何提示如何改善此服務將是偉大的。

通過我使用的池類從這個答案https://stackoverflow.com/a/436552/1910735線程池裏面ProccessMessageQueue

回答

1

我建議改變你的服務類,如下所示(下面的註釋):

protected override void OnStart(string[] args) 
{ 
    _thread = new Thread(WorkerThreadFunc) 
       { 
        Name = "Run Constantly Thread", 
        IsBackground = true 
       }; 
    _thread.Start(); 
} 

GISGoogleQueue _googleQueue1; 
GISGoogleQueue _googleQueue2; 
private void WorkerThreadFunc() 
{ 
    // This thread is exclusively used to keep the service running. 
    // As such, there's no real need for a while loop here. Create 
    // the necessary objects, start them, wait for shutdown, and 
    // cleanup. 
    _googleQueue1 = new GISGoogleQueue(...); 
    _googleQueue1.Start(); 
    _googleQueue2 = new GISGoogleQueue(...); 
    _googleQueue2.Start(); 

    _shutdownEvent.WaitOne(); // infinite wait 

    _googleQueue1.Shutdown(); 
    _googleQueue2.Shutdown(); 
} 

protected override void OnStop() 
{ 
    _shutdownEvent.Set(); 
    if (!_thread.Join(5000)) 
    { 
     // give the thread 5 seconds to stop 
     _thread.Abort(); 
    } 
} 

我忽略了你的GACAddressCalculator。從你展示的內容來看,它似乎是圍繞GISGoogleQueue的薄包裝。顯然,如果它實際上做了一些你沒有顯示的東西,那就需要重新考慮。

請注意,在WorkerThreadFunc()中創建了兩個GISGoogleQueue對象。下面我們來看看如何創建這些對象來實現合適的線程模型。

public class GISGoogleQueue : BaseMessageQueue 
{ 
    System.Threading.Thread _thread; 
    System.Threading.ManualResetEvent _shutdownEvent; 

    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread) 
     : base(queueName, threadCount, logger, messagesPerThread) 
    { 
     // Let this class wrap a thread object. Create it here. 
     _thread = new Thread(RunMessageQueueFunc() 
        { 
         Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(), 
         IsBackground = true 
        }; 
     _shutdownEvent = new ManualResetEvent(false); 
    } 

    public Start() 
    { 
     _thread.Start(); 
    } 

    public Shutdown() 
    { 
     _shutdownEvent.Set(); 
     if (!_thread.Join(5000)) 
     { 
      // give the thread 5 seconds to stop 
      _thread.Abort(); 
     } 
    } 

    private void RunMessageQueueFunc() 
    { 
     if (!MessageQueue.Exists(base.QueueName)) 
     { 
      _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName)); 
      return; 
     } 

     var messageQueue = new MessageQueue(QueueName); 
     var myVehMonLog = new VehMonLog(); 
     var o = new Object(); 
     var arrTypes = new Type[2]; 
     arrTypes[0] = myVehMonLog.GetType(); 
     arrTypes[1] = o.GetType(); 
     messageQueue.Formatter = new XmlMessageFormatter(arrTypes); 

     using (var pool = new Pool(ThreadCount)) 
     { 
      // Here's where we'll wait for the shutdown event to occur. 
      while (!_shutdownEvent.WaitOne(0)) 
      { 
       for (var i = 0; i < MessagesPerThread; i++) 
       { 
        try 
        { 
         // Stop execution until Tasks in pool have been executed 
         while (pool.TaskCount() >= MessagesPerThread) ; 

         // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins 
         var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); 

         if (message != null) // Check if message is not Null 
         { 
          var monLog = (VehMonLog)message.Body; 
          pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool 
         } 
        } 
        catch (Exception ex) 
        { 
        } 
       } 
      } 
     } 
    } 
} 

這種方法中心周圍使用Thread對象由GISGoogleQueue類纏繞。對於您創建的每個GISGoogleQueue對象,您將得到一個包裹線程,該線程將在GISGoogleQueue對象上調用Start()後執行該工作。

幾點。在RunMessageQueueFunc()中,您檢查是否存在隊列的名稱。如果沒有,則退出該功能。IF出現這種情況,線程退出了。問題在於,您可能希望在此過程中儘早進行檢查。只是一個想法。

其次,注意你的無限循環已取代對抗_shutdownEvent對象進行檢查。這樣,當服務關閉時,循環將停止。對於時效性,你需要確保一個完整的一次循環並不需要太長時間。否則,您可能會在關機後5秒鐘中止線程。該中止只存在,以確保一切都拆了,但如果可能的話,應避免。

我知道很多人會更喜歡使用Task類來做這樣的事情。看來,你是裏面RunMessageQueueFunc()。但對於在這個過程中運行的線程,我認爲Task類是錯誤的選擇,因爲它將線程池中的線程關聯起來。對我來說,那Thread類是什麼建立。

HTH

+1

感謝您的好評。我認爲你在RunMessageQueueFunc()中有一個錯字:while(!_shutdownEvent.Wait(0))我認爲它應該是:!_shutdownEvent.WaitOne(0) –

+0

不客氣。糾正了錯誤。 –

0

您可以使用Parallel.ForEach這樣的方式;

Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread 


private void ProcessQueue(QueueItem queue) 
{ 
    //your processing logic  
}