2015-10-06 127 views
0

我想創建一個「消息泵」,就像在一個線程上運行的自定義同步上下文。Nito.AsyncEx異步生產者/消費者隊列不處理

程序是Silverlight 5,同步隊列來自Stephen Cleary的Nito.AsyncEx nuget(v3.0.1)。

代碼(遺憾的長度,評論/調試有意包含):

public sealed class ThreadSynchronizationContext : SynchronizationContext, IDisposable 
{ 

    /// <summary>The queue of work items.</summary> 
    private readonly AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>> syncQueue = 
     new AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>>(); 

    private readonly Thread thread = null; 

    public ThreadSynchronizationContext() 
    { 
     Debug.WriteLine("------------------------"); 
     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - starting worker thread sync context!"); 
     Debug.WriteLine("------------------------"); 

     // using this hack so the new thread will start running before this function returns 
     using (var hack = new ManualResetEvent(false)) 
     { 
      thread = new Thread(async obj => 
      { 
       SetSynchronizationContext(obj as SynchronizationContext); 

       hack.Set(); 

       try 
       { 
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available..."); 
        while (await syncQueue.OutputAvailableAsync()) 
        { 
         Debug.WriteLine("awaiting queue item..."); 
         var workItem = await syncQueue.TryDequeueAsync(); 
         Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!"); 
         if (workItem.Success) 
         { 
          workItem.Item.Key(workItem.Item.Value); 
         } 
        } 

        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :("); 
       } 
       catch (ObjectDisposedException e) 
       { 
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :(("); 
       } 
      }); 
      thread.Start(this); 

      hack.WaitOne(); 

      Debug.WriteLine("worker thread: " + WorkerThreadId); 
     } 
    } 

    public int WorkerThreadId { get { return thread.ManagedThreadId; } } 

    public void Dispose() 
    { 
     syncQueue.Dispose(); 
    } 

    /// <summary>Dispatches an asynchronous message to the synchronization context.</summary> 
    /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param> 
    /// <param name="state">The object passed to the delegate.</param> 
    public async override void Post(SendOrPostCallback d, object state) 
    { 
     if (d == null) throw new ArgumentNullException("d"); 

     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item..."); 
     await syncQueue.EnqueueAsync(new KeyValuePair<SendOrPostCallback, object>(d, state)); 
     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued."); 
    } 

    /// <summary>Dispatches a synchronous message to the synchronization context.</summary> 
    /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param> 
    /// <param name="state">The object passed to the delegate.</param> 
    public override void Send(SendOrPostCallback d, object state) 
    { 
     if (d == null) throw new ArgumentNullException("d"); 

     using (var handledEvent = new ManualResetEvent(false)) 
     { 
      Post(SendOrPostCallback_BlockingWrapper, Tuple.Create(d, state, handledEvent)); 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - waiting for blocking wrapper!"); 
      handledEvent.WaitOne(); 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper finished."); 
     } 
    } 

    private static void SendOrPostCallback_BlockingWrapper(object state) 
    { 
     var innerCallback = (state as Tuple<SendOrPostCallback, object, ManualResetEvent>); 
     try 
     { 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - call callback from blocking wrapper..."); 
      innerCallback.Item1(innerCallback.Item2); 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper callback finished."); 
     } 
     finally 
     { 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - setting handle from blocking wrapper!"); 
      innerCallback.Item3.Set(); 
     } 
    } 
} 

問題:

當我啓動應用程序,並發表了幾個代表到上下文,這是輸出:

------------------------ 
thread 1 - starting worker thread sync context! 
------------------------ 
thread 17 - awaiting queue available... 
worker thread: 17 
thread 1 - enqueuing item... 
thread 8 - enqueuing item... 
thread 8 - item enqueued. 
thread 1 - item enqueued. 
thread 1 - waiting for blocking wrapper! 

基本上,程序凍結在李Send()方法的ne handledEvent.WaitOne();,好像隊列從未開始處理添加的項目。

我有點難倒,任何指導表示讚賞。

回答

2

這裏的問題是有點棘手,但有一個事實,即你會看到你的「入列項...」調試輸出兩次時,你只叫Send一次一個很好的線索。

實際發生的情況是自定義同步上下文正在被線程的主代表內的await s拾取。所以,它會嘗試將其隊列處理代碼排隊到它自己的隊列中。

進行分解:

  • 線程代表開始執行和命中await syncQueue.OutputAvailableAsync()線。
  • 此時,線程委託在當前同步上下文(ThreadSynchronizationContext的實例)中註冊其繼續,然後返回(導致線程退出)。
  • 當呼叫代碼調用Send時,它將一個項目排入隊列,導致OutputAvailableAsync完成。
  • 線程委託然後嘗試繼續執行Post到捕獲的ThreadSynchronizationContext

如果你想要一個單線程的同步上下文,那麼你根本就不應該有一個異步線程委託。所以,應該使用同步的API:

thread = new Thread(obj => 
{ 
    SetSynchronizationContext(obj as SynchronizationContext); 

    hack.Set(); 

    try 
    { 
     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available..."); 
     while (true) 
     { 
      Debug.WriteLine("awaiting queue item..."); 
      var workItem = syncQueue.TryDequeue(); 
      Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!"); 
      if (!workItem.Success) 
       break; 
      workItem.Item.Key(workItem.Item.Value); 
     } 

     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :("); 
    } 
    catch (ObjectDisposedException e) 
    { 
     Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :(("); 
    } 
}); 

其實,我建議避免async void完全,所以我建議作出Post同步方法也(它沒有執行其SendOrPostCallback它仍然是「異步」的意義立即委託;它是同步入列):

public override void Post(SendOrPostCallback d, object state) 
{ 
    if (d == null) throw new ArgumentNullException("d"); 

    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item..."); 
    syncQueue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state)); 
    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued."); 
} 

或者,你可以保存自己的這所有的痛苦,只是使用已經是AsyncEx的一部分的。 AsyncContextThread在內部使用其自己的單線程同步上下文。