2010-03-02 63 views
3

我有一個C#應用程序,它訂閱我們的消息系統上的一個主題以進行值更新。當有新的價值出現時,我會做一些處理然後繼續。問題是,更新可能會比應用程序可以處理它們更快。我想要做的就是堅持最新的價值,所以我不想要一個隊列。例如,源發佈值「1」,我的應用程序收到它;在處理過程中,源在我的應用程序完成處理之前發佈序列(2,3,4,5);我的應用程序然後處理值「5」,與先前的值扔掉。如何異步處理郵件,處理時丟棄任何新郵件?

由於它是基於專有消息庫的,因此很難發佈有效的代碼示例,但我認爲這是一種常見模式,我只是無法弄清楚它的名稱......它看起來像處理函數必須在消息傳遞迴調之外的獨立線程上運行,但我不知道如何組織這些,例如該線程是如何通知價值變化的。有關我需要做什麼的一般提示?

回答

2

一個非常簡單的方法可能是這樣的:

private IMessage _next; 

public void ReceiveMessage(IMessage message) 
{ 
    Interlocked.Exchange(ref _next, message); 
} 

public void Process() 
{ 
    IMessage next = Interlocked.Exchange(ref _next, null); 

    if (next != null) 
    { 
     //... 
    } 
} 
+0

+1,但如果我理解正確的問題,第一種方法將更多的是接收者而不是發件人。 – 2010-03-02 15:43:49

+0

我只是改變了它。謝謝! – Matthias 2010-03-02 15:45:33

+0

謝謝,我想我知道它的主旨,但Process()如何被調用?它是否必須在某種其他線程中運行某種循環,或者在接收到新消息時是否有任何方法來調用它,如果它尚未處理先前的值? – toasteroven 2010-03-02 16:30:07

0

這不是一個「模式」,但您可以使用共享數據結構來保存該值。如果從消息傳遞庫只接收到一個值,那麼一個簡單的對象就可以。否則,您可能可以使用散列表來存儲多個消息值(如果需要)。

例如,在消息接收線程上:當消息進入時,使用其值添加/更新數據結構。在線程方面,您可以定期檢查這個數據結構以確保您仍然具有相同的值。如果你不這樣做,那麼放棄你已經完成的任何處理並用新值重新處理。

當然,您需要確保數據結構在線程之間正確同步。

1

一般來說,人們使用消息傳遞系統來防止丟失消息。我最初對解決方案的反應是接收入站數據的線程,如果處理線程已經在運行,那麼您將丟棄數據並等待下一個元素並重復,然後嘗試將其傳遞到處理線程。

0

顯然,消息傳遞庫的設計可以影響處理此問題的最佳方法。我過去是如何使用類似功能的庫來完成這項工作的,我有一個線程可以監聽事件,並將它們放入隊列中,然後讓Threadpool工作人員將郵件出列並處理它們。

您可以在多線程asyncronous作業隊列讀了起來:

Mutlithreaded Job Queue

Work Queue Threading

0

一個簡單的方法是使用一個成員變量來保存收到的最後一個值,並用一個鎖包裝它。另一種方法是將傳入值推入堆棧。當您爲新的價值已經準備好,調用Stack.Pop(),然後Stack.Clear():

public static class Incoming 
{ 
    private static object locker = new object(); 
    private static object lastMessage = null; 

    public static object GetMessage() 
    { 
     lock (locker) 
     { 
      object tempMessage = lastMessage; 
      lastMessage = null; 
      return tempMessage; 
     } 
    } 
    public static void SetMessage(object messageArg) 
    { 
     lock (locker) 
     { 
      lastMessage = messageArg; 
     } 
    } 

    private static Stack<object> messageStack = new Stack<object>(); 
    public static object GetMessageStack() 
    { 
     lock (locker) 
     { 
      object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null; 
      messageStack.Clear(); 
      return tempMessage; 
     } 
    } 
    public static void SetMessageStack(object messageArg) 
    { 
     lock (locker) 
     { 
      messageStack.Push(messageArg); 
     } 
    } 
} 

把一個單獨的線程處理功能是一個好主意。要麼使用處理線程的回調方法來表明它已準備好接收另一條消息,要麼表示已完成,然後讓主線程在收到消息時啓動新的處理器線程(通過上面的SetMessage ...) 。