2011-05-23 84 views
5

如何在c#中使用隊列? 我想要一個將數據排入隊列的線程&另一個線程將數據從隊列中取出。這些線程應該同時運行。如何在不同線程中同時使用隊列

可能嗎?

+3

問題不清楚 – 2011-05-23 05:54:47

回答

0

一種可能的實現是使用具有分離的讀環形緩衝器指針和寫指針。在每次讀/寫操作時,將相反的指針(必須是線程安全的)複製到本地上下文中,然後執行批量讀取或寫入操作。

在每次讀取或寫入時更新指針和脈衝事件。

如果讀或寫線程獲取到它沒有更多的工作要做,你等待其他線程事件重讀相應的指針之前。

2

如果使用System.Collections.Queue線程安全有保障這樣:

var queue = new Queue(); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 
Queue.Synchronized(queue).Enqueue(new WorkItem()); 
Queue.Synchronized(queue).Clear(); 

,如果你想使用System.Collections.Generic.Queue<T>然後創建您自己的包裝類。我這樣做了System.Collections.Generic.Stack<T>

using System; 
using System.Collections.Generic; 

[Serializable] 
public class SomeStack 
{ 
    private readonly object stackLock = new object(); 

    private readonly Stack<WorkItem> stack; 

    public ContextStack() 
    { 
     this.stack = new Stack<WorkItem>(); 
    } 

    public IContext Push(WorkItem context) 
    { 
     lock (this.stackLock) 
     { 
      this.stack.Push(context); 
     } 

     return context; 
    } 

    public WorkItem Pop() 
    { 
     lock (this.stackLock) 
     { 
      return this.stack.Pop(); 
     } 
    } 
} 
0

您可以使用原子操作實現線程安全隊列。我曾經爲多人遊戲寫過以下課程。它允許多個線程安全地寫入隊列,並安全地從隊列中讀取一個其它線程:

/// <summary> 
/// The WaitFreeQueue class implements the Queue abstract data type through a linked list. The WaitFreeQueue 
/// allows thread-safe addition and removal of elements using atomic operations. Multiple threads can add 
/// elements simultaneously, and another thread can remove elements from the queue at the same time. Only one 
/// thread can remove elements from the queue at any given time. 
/// </summary> 
/// <typeparam name="T">The type parameter</typeparam> 
public class WaitFreeQueue<T> 
{ 
    // Private fields 
    // ============== 
    #region Private fields 
    private Node<T> _tail; // The tail of the queue. 
    private Node<T> _head; // The head of the queue. 
    #endregion 



    // Public methods 
    // ============== 
    #region Public methods 
    /// <summary> 
    /// Removes the first item from the queue. This method returns a value to indicate if an item was 
    /// available, and passes the item back through an argument. 
    /// This method is not thread-safe in itself (only one thread can safely access this method at any 
    /// given time) but it is safe to call this method while other threads are enqueueing items. 
    /// 
    /// If no item was available at the time of calling this method, the returned value is initialised 
    /// to the default value that matches this instance's type parameter. For reference types, this is 
    /// a Null reference. 
    /// </summary> 
    /// <param name="value">The value.</param> 
    /// <returns>A boolean value indicating if an element was available (true) or not.</returns> 
    public bool Dequeue(ref T value) 
    { 
     bool succeeded = false; 
     value = default(T); 

     // If there is an element on the queue then we get it. 
     if (null != _head) 
     { 
      // Set the head to the next element in the list, and retrieve the old head. 
      Node<T> head = System.Threading.Interlocked.Exchange<Node<T>>(ref _head, _head.Next); 

      // Sever the element we just pulled off the queue. 
      head.Next = null; 

      // We have succeeded. 
      value = head.Value; 
      succeeded = true; 
     } 

     return succeeded; 
    } 

    /// <summary> 
    /// Adds another item to the end of the queue. This operation is thread-safe, and multiple threads 
    /// can enqueue items while a single other thread dequeues items. 
    /// </summary> 
    /// <param name="value">The value to add.</param> 
    public void Enqueue(T value) 
    { 
     // We create a new node for the specified value, and point it to itself. 
     Node<T> newNode = new Node<T>(value); 

     // In one atomic operation, set the tail of the list to the new node, and remember the old tail. 
     Node<T> previousTail = System.Threading.Interlocked.Exchange<Node<T>>(ref _tail, newNode); 

     // Link the previous tail to the new tail. 
     if (null != previousTail) 
      previousTail.Next = newNode; 

     // If this is the first node in the list, we save it as the head of the queue. 
     System.Threading.Interlocked.CompareExchange<Node<T>>(ref _head, newNode, null); 
    } // Enqueue() 
    #endregion 



    // Public constructor 
    // ================== 
    #region Public constructor 
    /// <summary> 
    /// Constructs a new WaitFreeQueue instance. 
    /// </summary> 
    public WaitFreeQueue() { } 

    /// <summary> 
    /// Constructs a new WaitFreeQueue instance based on the specified list of items. 
    /// The items will be enqueued. The list can be a Null reference. 
    /// </summary> 
    /// <param name="items">The items</param> 
    public WaitFreeQueue(IEnumerable<T> items) 
    { 
     if(null!=items) 
      foreach(T item in items) 
       this.Enqueue(item); 
    } 
    #endregion 



    // Private types 
    // ============= 
    #region Private types 
    /// <summary> 
    /// The Node class represents a single node in the linked list of a WaitFreeQueue. 
    /// It contains the queued-up value and a reference to the next node in the list. 
    /// </summary> 
    /// <typeparam name="U">The type parameter.</typeparam> 
    private class Node<U> 
    { 
     // Public fields 
     // ============= 
     #region Public fields 
     public Node<U> Next; 
     public U Value; 
     #endregion 



     // Public constructors 
     // =================== 
     #region Public constructors 
     /// <summary> 
     /// Constructs a new node with the specified value. 
     /// </summary> 
     /// <param name="value">The value</param> 
     public Node(U value) 
     { 
      this.Value = value; 
     } 
     #endregion 

    } // Node generic class 
    #endregion 

} // WaitFreeQueue class 

如果在多個線程可以連接隊列是僅具有一個單獨的線程去排隊的限制好,你可以使用它。這對於遊戲來說很棒,因爲這意味着不需要線程同步。

0

例簡單的用法是

namespace ConsoleApplication1 
{ 
    class Program 
    { 

     static void Main(string[] args) 
     { 
      ExampleQueue eq = new ExampleQueue(); 
      eq.Run(); 

      // Wait... 
      System.Threading.Thread.Sleep(100000); 
     } 


    } 

    class ExampleQueue 
    { 
     private Queue<int> _myQueue = new Queue<int>(); 

     public void Run() 
     { 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PushToQueue), null); 
      ThreadPool.QueueUserWorkItem(new WaitCallback(PopFromQueue), null); 
     } 

     private void PushToQueue(object Dummy) 
     { 
      for (int i = 0; i <= 1000; i++) 
      { 
       lock (_myQueue) 
       { 
        _myQueue.Enqueue(i); 
       } 
      } 
      System.Console.WriteLine("END PushToQueue"); 

     } 

     private void PopFromQueue(object Dummy) 
     { 
      int dataElementFromQueue = -1; 
      while (dataElementFromQueue < 1000) 
      { 
       lock (_myQueue) 
       { 
        if (_myQueue.Count > 0) 
        { 
         dataElementFromQueue = _myQueue.Dequeue(); 

         // Do something with dataElementFromQueue... 
         System.Console.WriteLine("Dequeued " + dataElementFromQueue); 
        } 
       } 
      } 
      System.Console.WriteLine("END PopFromQueue"); 

     } 
    } 
} 
8

如果需要線程安全使用ConcurrentQueue<T>

+0

只是要注意,ConcurrentQueue 是.NET框架4及以上;如果您仍在針對3.5或更低版本的框架進行開發,則無法爲您提供此功能。 – Will 2013-06-18 20:30:19