2013-04-24 114 views
0

我必須在多個線程(現在至少3)之間編寫.Net3.5之間的通信,並且他們每個都是生產者和消費者。 而是每對線程之間發送信號的我的想法是實現將存儲像這類消息隊列:隊列爲多個生產者和消費者

enum Signals { ObjectArrivedOnLightBarrier, ObjectLeftLightBarrier, CodeFound }; 
enum UnitID { GrabThread, ImageProcessingThread, SaveThread }; 

// Consumer shows who the message is intended for (and only that unit is allowed to remove it from the queue) 
public class QueuedSignal 
{ 
    Signals Message; 
    UnitID Producer; 
    UnitID Consumer; 
} 

的想法是,任何線程可以在隊列中的第一項偷看離開如果消息不是針對它的話(如果其他消息很少,並且其中一個消息可能用於此線程,則不是問題)。

當有多個生產者和消費者時,隊列是線程安全的。

回答

2

Queue<T>不是線程安全。

如果您使用.Net 4或更高版本,我會建議您使用BlockingCollection<T>

不幸的是,你不能使用它,但有幾個實現的併發隊列踢。

在Marc Gravel的回答中查看this one。不幸的是,它沒有Peek()方法。

這是我以前使用過的一個類.net 4出來之前;也許這可能是你感興趣的。這不是最大的實施;我們更多地使用它作爲佔位符,直到.Net 4出來。即使如此,這裏是:

/// <summary>A bounded blocking queue.</summary> 
/// <typeparam name="T">The element type of the queue, which must be a reference type.</typeparam> 

public sealed class BoundedBlockingQueue<T>: IDisposable where T: class 
{ 
    #region Construction and disposal 

    /// <summary>Constructor.</summary> 
    /// <param name="maxQueueSize"> 
    /// The maximum size of the queue. 
    /// Calls to <see cref="Enqueue"/> when the queue is full will block until at least one item has been removed. 
    /// Calls to <see cref="Dequeue"/> when the queue is empty will block until a new item is enqueued. 
    /// </param> 

    public BoundedBlockingQueue(int maxQueueSize) 
    { 
     if (maxQueueSize <= 0) 
     { 
      throw new ArgumentOutOfRangeException("maxQueueSize"); 
     } 

     _queue     = new Queue<T>(maxQueueSize); 
     _itemsAvailable   = new Semaphore(0, maxQueueSize); 
     _spaceAvailable   = new Semaphore(maxQueueSize, maxQueueSize); 
     _queueStopped   = new ManualResetEvent(false); 
     _queueStoppedAndEmpty = new ManualResetEvent(false); 
     _stoppedOrItemAvailable = new WaitHandle[] { _queueStopped, _itemsAvailable }; 
    } 

    /// <summary>Disposal.</summary> 

    public void Dispose() 
    { 
     if (_itemsAvailable != null) 
     { 
      _itemsAvailable.Close(); 
      _spaceAvailable.Close(); 
      _queueStopped.Close(); 
      _queueStoppedAndEmpty.Close(); 
      _itemsAvailable = null;   // Use _itemsAvailable as a flag to indicate that Dispose() has been called. 
     } 
    } 

    #endregion Construction and disposal 

    #region Public properties 

    /// <summary>The number of items currently in the queue.</summary> 

    public int Count 
    { 
     get 
     { 
      throwIfDisposed(); 

      lock (_queue) 
      { 
       return _queue.Count; 
      } 
     } 
    } 

    /// <summary>Has <see cref="Stop"/> been called?</summary> 

    public bool Stopped 
    { 
     get 
     { 
      throwIfDisposed(); 
      return _stopped; 
     } 
    } 

    #endregion Public properties 

    #region Public methods 

    /// <summary> 
    /// Signals that new items will no longer be placed into the queue. 
    /// After this is called, calls to <see cref="Dequeue"/> will return null immediately if the queue is empty. 
    /// Before this is called, calls to <see cref="Dequeue"/> will block if the queue is empty. 
    /// Attempting to enqueue items after this has been called will cause an exception to be thrown. 
    /// </summary> 
    /// <remarks> 
    /// If you use a different thread to enqueue items than the thread that calls Stop() you might get a race condition. 
    /// 
    /// If the queue is full and a thread calls Enqueue(), that thread will block until space becomes available in the queue. 
    /// If a different thread then calls Stop() while the other thread is blocked in Enqueue(), the item enqueued by the other 
    /// thread may become lost since it will be enqueued after the special null value used to indiciate the end of the 
    /// stream is enqueued. 
    /// 
    /// To prevent this from happening, you must enqueue from the same thread that calls Stop(), or provide another 
    /// synchronisation mechanism to avoid this race condition. 
    /// </remarks> 

    public void Stop() 
    { 
     throwIfDisposed(); 

     lock (_queue) 
     { 
      _queueStopped.Set(); 
      _stopped = true; 
     } 
    } 

    /// <summary> 
    /// Returns the front item of the queue without removing it, or null if the queue is currently empty. 
    /// A null return does NOT indicate that <see cref="Stop"/> has been called. 
    /// This never blocks. 
    /// </summary> 
    /// <returns>The front item of the queue, or null if the queue is empty.</returns> 

    public T Peek() 
    { 
     throwIfDisposed(); 
     T result; 

     lock (_queue) 
     { 
      if (_queue.Count > 0) 
      { 
       result = _queue.Peek(); 
      } 
      else 
      { 
       result = null; 
      } 
     } 

     return result; 
    } 

    /// <summary> 
    /// Enqueues a new non-null item. 
    /// If there is no room in the queue, this will block until there is room. 
    /// An exception will be thrown if <see cref="Stop"/> has been called. 
    /// </summary> 
    /// <param name="item">The item to be enqueued. This may not be null.</param> 

    public void Enqueue(T item) 
    { 
     throwIfDisposed(); 

     if (item == null) 
     { 
      throw new ArgumentNullException("item"); 
     } 

     if (_stopped) 
     { 
      throw new InvalidOperationException("Attempting to enqueue an item to a stopped queue."); 
     } 

     this.enqueue(item); 
    } 

    /// <summary> 
    /// Dequeues the next available item. 
    /// If <see cref="Stop"/> has been called, this returns null if the queue is empty. 
    /// Otherwise it blocks until an item becomes available (or <see cref="Stop"/> is called). 
    /// </summary> 
    /// <returns>The next available item, or null if the queue is empty and stopped.</returns> 

    public T Dequeue() 
    { 
     throwIfDisposed(); 

     if (_isQueueStoppedAndEmpty) 
     { 
      return null; 
     } 

     WaitHandle.WaitAny(_stoppedOrItemAvailable); 

     lock (_queue) 
     { 
      if (_stopped && (_queue.Count == 0)) 
      { 
       _isQueueStoppedAndEmpty = true; 
       _queueStoppedAndEmpty.Set(); 
       return null; 
      } 
      else 
      { 
       T item = _queue.Dequeue(); 
       _spaceAvailable.Release(); 
       return item; 
      } 
     } 
    } 

    /// <summary>Waits forever for the queue to become empty AND stopped.</summary> 

    public void WaitUntilStoppedAndEmpty() 
    { 
     throwIfDisposed(); 
     WaitUntilStoppedAndEmpty(Timeout.Infinite); 
    } 

    /// <summary>Waits up to the specified time for the queue to become empty AND stopped.</summary> 
    /// <param name="timeoutMilliseconds">The maximum wait time, in milliseconds.</param> 
    /// <returns>True if the wait succeeded, false if it timed-out.</returns> 

    public bool WaitUntilStoppedAndEmpty(int timeoutMilliseconds) 
    { 
     throwIfDisposed(); 
     return _queueStoppedAndEmpty.WaitOne(timeoutMilliseconds); 
    } 

    #endregion Public methods 

    #region Private methods 

    /// <summary>Enqueues a new item (which may be null to indicate the last item to go into the queue).</summary> 
    /// <param name="item">An item to enqueue.</param> 

    private void enqueue(T item) 
    { 
     _spaceAvailable.WaitOne(); 

     lock (_queue) 
     { 
      _queue.Enqueue(item); 
     } 

     _itemsAvailable.Release(); 
    } 

    /// <summary>Throws if this object has been disposed.</summary> 

    private void throwIfDisposed() 
    { 
     if (_itemsAvailable == null) 
     { 
      throw new ObjectDisposedException(this.GetType().FullName); 
     } 
    } 

    #endregion Private methods 

    #region Fields 

    /// <summary> 
    /// Contains wait handles for "stopped" and "item available". 
    /// Therefore using this for WaitAny() will wait until the queue is stopped 
    /// or an item becomes available, whichever is the sooner. 
    /// </summary> 

    private readonly WaitHandle[] _stoppedOrItemAvailable; 

    private Semaphore _itemsAvailable; 

    private volatile bool _stopped; 
    private volatile bool _isQueueStoppedAndEmpty; 

    private readonly Queue<T> _queue; 
    private readonly Semaphore _spaceAvailable; 
    private readonly ManualResetEvent _queueStopped; 
    private readonly ManualResetEvent _queueStoppedAndEmpty; 

    #endregion Fields 
} 

這裏是一箇舊的單元測試。這不是一個很好的單元測試;這一次測試的東西太多了,有一些其他的問題,但它會演示如何使用隊列:

[TestMethod] 

public void TestBoundedBlockingQueue() 
{ 
    int maxQueueSize = 8; 

    using (var queue = new BoundedBlockingQueue<string>(maxQueueSize)) 
    { 
     // Fill the queue, but don't block. 

     for (int i = 0; i < maxQueueSize; ++i) 
     { 
      int start1 = DateTimeFunctions.TickCount; 
      queue.Enqueue(i.ToString()); 
      int elapsed1 = DateTimeFunctions.TickCount - start1; 
      Assert.IsTrue(elapsed1 < 100, "Took too long to enqueue an item."); // Shouldn't have taken more than 100 ms to enqueue the item. 
     } 

     // Now if we try to enqueue something we should block (since the queue should be full). 
     // We can detect this by starting a thread that will dequeue something in a few seconds 
     // and then seeing how long the main thread took to enqueue something. 
     // It should have taken around the thread sleep time (+/- half a second or so). 

     int sleepTime = 2500; 
     int tolerance = 500; 
     Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Dequeue();}, "TestBoundedBlockingQueue Dequeue()"); 
     int start2 = DateTimeFunctions.TickCount; 
     queue.Enqueue(maxQueueSize.ToString()); 
     int elapsed2 = DateTimeFunctions.TickCount - start2; 
     Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item."); 

     // Now verify that the remaining items in the queue are the expected ones, 
     // i.e. from "1".."maxQueueSize" (since the first item, "0", has been dequeued). 

     for (int i = 1; i <= maxQueueSize; ++i) 
     { 
      Assert.AreEqual(i.ToString(), queue.Dequeue(), "Incorrect item dequeued."); 
     } 

     Assert.AreEqual(0, queue.Count); 

     // Now if we try to dequeue something we should block (since the queue is empty). 
     // We can detect this by starting a thread that will enqueue something in 5 seconds 
     // and then seeing how long the main thread took to dequeue something. 
     // It should have taken around 5 seconds (+/- half a second or so). 

     string testValue = "TEST"; 
     Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Enqueue(testValue);}, "TestBoundedBlockingQueue Enqueue()"); 
     start2 = DateTimeFunctions.TickCount; 
     Assert.AreEqual(testValue, queue.Dequeue(), "Incorrect item dequeued"); 
     elapsed2 = DateTimeFunctions.TickCount - start2; 
     Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item."); 
    } 
} 
相關問題