我想創建一個「消息泵」,就像在一個線程上運行的自定義同步上下文。Nito.AsyncEx異步生產者/消費者隊列不處理
程序是Silverlight 5,同步隊列來自Stephen Cleary的Nito.AsyncEx nuget(v3.0.1)。
代碼(遺憾的長度,評論/調試有意包含):
public sealed class ThreadSynchronizationContext : SynchronizationContext, IDisposable
{
/// <summary>The queue of work items.</summary>
private readonly AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>> syncQueue =
new AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>>();
private readonly Thread thread = null;
public ThreadSynchronizationContext()
{
Debug.WriteLine("------------------------");
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - starting worker thread sync context!");
Debug.WriteLine("------------------------");
// using this hack so the new thread will start running before this function returns
using (var hack = new ManualResetEvent(false))
{
thread = new Thread(async obj =>
{
SetSynchronizationContext(obj as SynchronizationContext);
hack.Set();
try
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available...");
while (await syncQueue.OutputAvailableAsync())
{
Debug.WriteLine("awaiting queue item...");
var workItem = await syncQueue.TryDequeueAsync();
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!");
if (workItem.Success)
{
workItem.Item.Key(workItem.Item.Value);
}
}
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :(");
}
catch (ObjectDisposedException e)
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :((");
}
});
thread.Start(this);
hack.WaitOne();
Debug.WriteLine("worker thread: " + WorkerThreadId);
}
}
public int WorkerThreadId { get { return thread.ManagedThreadId; } }
public void Dispose()
{
syncQueue.Dispose();
}
/// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public async override void Post(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item...");
await syncQueue.EnqueueAsync(new KeyValuePair<SendOrPostCallback, object>(d, state));
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued.");
}
/// <summary>Dispatches a synchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public override void Send(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
using (var handledEvent = new ManualResetEvent(false))
{
Post(SendOrPostCallback_BlockingWrapper, Tuple.Create(d, state, handledEvent));
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - waiting for blocking wrapper!");
handledEvent.WaitOne();
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper finished.");
}
}
private static void SendOrPostCallback_BlockingWrapper(object state)
{
var innerCallback = (state as Tuple<SendOrPostCallback, object, ManualResetEvent>);
try
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - call callback from blocking wrapper...");
innerCallback.Item1(innerCallback.Item2);
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper callback finished.");
}
finally
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - setting handle from blocking wrapper!");
innerCallback.Item3.Set();
}
}
}
問題:
當我啓動應用程序,並發表了幾個代表到上下文,這是輸出:
------------------------
thread 1 - starting worker thread sync context!
------------------------
thread 17 - awaiting queue available...
worker thread: 17
thread 1 - enqueuing item...
thread 8 - enqueuing item...
thread 8 - item enqueued.
thread 1 - item enqueued.
thread 1 - waiting for blocking wrapper!
基本上,程序凍結在李Send()
方法的ne handledEvent.WaitOne();
,好像隊列從未開始處理添加的項目。
我有點難倒,任何指導表示讚賞。