2011-03-31 143 views
1

我正在爲Web服務編寫一個相對簡單的「代理」應用程序。總體思路是,TCP服務器(帶有異步連接)將從客戶端讀取(字符串)數據,並將該數據(作爲讀取回調函數的一部分)放入兩個隊列(Q1和Q2)之一。另一個線程將讀取這些隊列中的數據並將其傳遞給Web服務。 Q1中的數據應優先於Q2中的任何數據。.NET生產者 - 消費者問題

我一直在閱讀有關生產者/消費者模式,這似乎是我想要實現的關於隊列。由於我的入隊和出隊操作將在不同的線程上發生,似乎很明顯,我的隊列需要線程安全並支持某種鎖定機制?這是一個.NET 4.0應用程序,我在新的BlockingCollection和ConcurrentQueue類中看到了文檔,但我不確定在這種情況下究竟有什麼區別或者如何實現它們。任何人都可以對此有所瞭解嗎?謝謝!

回答

2

我會做像下面的類。當您生成一個項目以將其添加到其中一個隊列時,您可以撥打Enqueue()。這種方法總是立即返回(幾乎)。在另一個線程中,當您準備好使用某個物品時,請致電Dequeue()。它試圖首先從高優先級隊列中取出。如果在任何隊列中沒有可用項目,則呼叫阻止。當你完成生產時,你可以撥打Complete()。在該呼叫已經完成並且兩個隊列都是空的情況下,下一個呼叫(或當前被阻止的呼叫)到Dequeue()將拋出InvalidOperationException

如果您的製片人長時間比您的消費者更快,您應該排隊(new BlockingCollection<T>(capacity))。但在這種情況下,如果只有一個線程同時產生低優先級和高優先級項目,那麼高優先級項目可能需要等待低優先級項目。你可以通過一個線程來產生高優先級的項目和一個低優先級的項目來解決這個問題。或者你只能綁定高優先級的隊列,並希望你不會一次獲得一百萬個低優先級的項目。

class Worker<T> 
{ 
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>(); 
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>(); 

    public void Enqueue(T item, bool highPriority) 
    { 
     BlockingCollection<T> queue; 
     if (highPriority) 
      queue = m_highPriorityQueue; 
     else 
      queue = m_lowPriorityQueue; 

     queue.Add(item); 
    } 

    public T Dequeue() 
    { 
     T result; 

     if (!m_highPriorityQueue.IsCompleted) 
     { 
      if (m_highPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (!m_lowPriorityQueue.IsCompleted) 
     { 
      if (m_lowPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted) 
      throw new InvalidOperationException("All work is done."); 
     else 
     { 
      try 
      { 
       BlockingCollection<T>.TakeFromAny(
        new[] { m_highPriorityQueue, m_lowPriorityQueue }, 
        out result); 
      } 
      catch (ArgumentException ex) 
      { 
       throw new InvalidOperationException("All work is done.", ex); 
      } 

      return result; 
     } 
    } 

    public void Complete() 
    { 
     m_highPriorityQueue.CompleteAdding(); 
     m_lowPriorityQueue.CompleteAdding(); 
    } 
} 
+0

非常感謝!我非常欣賞這些信息和建議! – user685869 2011-04-01 14:00:03

0

BlockingCollection默認使用ConcurrentQueue。應該適合你的應用。如果您將F#與郵箱和異步塊一起使用,可能會更容易。我早些時候做了一個普通實現的示例文章。

Map/reduce with F# Agent or MailboxProcessor