2012-07-20 53 views
1

在WPF應用程序中,我有一個發佈消息的第三方庫。如何在單線程服務中正確分配多線程消息流?

的消息是這樣的:

public class DialectMessage 
{ 
    public string PathAndQuery { get; private set; } 

    public byte[] Body { get; private set; } 

    public DialectMessage(string pathAndQuery, byte[] body) 
    { 
     this.PathAndQuery = pathAndQuery; 
     this.Body = body; 
    } 
} 

我設置從我app.cs文件外部信息源:

public partial class App : Application 
{ 
    static App() 
    { 
     MyComponent.MessageReceived += MessageReceived; 
     MyComponent.Start(); 
    } 

    private static void MessageReceived(Message message) 
    { 
     //handle message 
    } 

} 

這些信息可以從多個線程在同一時間進行發佈,使可能一次多次調用事件處理程序。

我有一個服務對象,必須解析傳入的消息。這個服務實現了以下接口:

internal interface IDialectService 
{ 
    void Parse(Message message); 
} 

而且我有一個默認的靜態實例在我app.cs文件:

private readonly static IDialectService g_DialectService = new DialectService(); 

爲了簡化分析程序的代碼,我想確保一次只能解析一條消息。

我也想避免鎖定在我的事件處理程序,因爲我不想阻止第三方對象。

因爲這個要求,我不能直接調用從我的消息事件處理程序

什麼是保證這個單線程執行正確的方法g_DialectService.Parse

我的第一個是將我的解析操作封裝在Produce/Consumer模式中。爲了達到這個目標,我已經嘗試以下方法:

  1. 聲明一個BlockingCollection在我app.cs:

    private readonly static BlockingCollection<Message> g_ParseOperations = new BlockingCollection<Message>(); 
    
  2. 更改我的事件處理程序的身體補充的操作:

    private static void MessageReceived(Message message) 
    { 
        g_ParseOperations.Add(message); 
    } 
    
  3. 創建從我的應用程序構造泵收集一個新的線程:

    static App() 
    { 
        MyComponent.MessageReceived += MessageReceived; 
        MyComponent.Start(); 
    
        Task.Factory.StartNew(() => 
        { 
         Message message; 
         while (g_ParseOperations.TryTake(out message)) 
         { 
          g_DialectService.Parse(message); 
         } 
        }); 
    } 
    

但是,這段代碼似乎不起作用。從不調用服務分析方法。

此外,我不確定此模式是否允許我正確關閉應用程序。

我在代碼中改變了什麼以確保一切正常?

PS:我針對.NET 4.5

[編輯]一些搜索後,並the answer of ken2k,我可以看到我是在錯誤的地方採取的通話trytake。

我更新的代碼現在是:

private readonly static CancellationTokenSource g_ShutdownToken = new CancellationTokenSource(); 

    private static void MessageReceived(Message message) 
    { 
     g_ParseOperations.Add(message, g_ShutdownToken.Token); 
    } 

    static App() 
    { 
     MyComponent.MessageReceived += MessageReceived; 
     MyComponent.Start(); 

     Task.Factory.StartNew(() => 
     { 
      while (!g_ShutdownToken.IsCancellationRequested) 
      { 
       var message = g_ParseOperations.Take(g_ShutdownToken.Token); 
       g_DialectService.Parse(message); 
      } 
     }); 
    } 

    protected override void OnExit(ExitEventArgs e) 
    { 
     g_ShutdownToken.Cancel(); 
     base.OnExit(e); 
    } 

此代碼作爲預期。消息按正確的順序處理。但是,一旦我退出應用程序,就會在Take方法上得到一個「CancelledException」,即使我之前只是測試IsCancellationRequested。

+0

那麼,你正在關閉應用程序 - 只是吃例外! – 2012-07-20 12:30:02

+0

..或更好,不要打擾取消線程。 – 2012-07-20 12:31:54

+0

這可能適用於我實際退出應用程序。但就我個人的知識而言,我想知道如何正確關機(想象一下啓動/停止按鈕)。 – 2012-07-20 12:33:23

回答

2

documentation說,大約BlockingCollection.TryTake(out T item)

如果集合是空的,此方法立即返回false。

所以基本上你的循環立即退出。你可能想要的是調用TryTake method用超時參數代替,並退出循環,當mustStop變量變爲true

bool mustStop = false; // Must be set to true on somewhere else when you exit your program 
... 
while (!mustStop) 
{ 
    Message yourMessage; 

    // Waits 500ms if there's nothing in the collection. Avoid to consume 100% CPU 
    // for nothing in the while loop when the collection is empty. 
    if (yourCollection.TryTake(out yourMessage, 500)) 
    { 
     // Parses yourMessage here 
    } 
} 

爲您編輯的問題:如果你的意思是你收到OperationCanceledException,這是好的,正是如何以CancellationToken對象作爲參數的方法必須行爲:)只需捕獲異常並優雅地退出。

+0

發佈後我看到了。讓我更新我的問題,仍然存在一些問題。 – 2012-07-20 12:13:42

+0

@SteveB查看編輯答案 – ken2k 2012-07-20 12:33:19

+0

好的,我明白了。我不能簡單地檢查我的取消資格。取消要求。因爲,在取消實際發生之前完成此檢查。大部分時間,Take方法將處於等待狀態。我已經添加了一個簡單的try/catch(OperationCancelledException)並且工作正常。謝謝 – 2012-07-20 12:36:00