2012-08-24 72 views
4

假設我有10個項目(我需要通過http協議獲取它們),代碼N中的任務開始獲取數據,每個任務依次需要10個項目。我把這些物品放在ConcurrentQueue<Item>。之後,這些項目將逐個以線程不安全的方式處理。異步/等待任務和WaitHandle

async Task<Item> GetItemAsync() 
{ 
    //fetch one item from the internet 
} 

async Task DoWork() 
{ 
    var tasks = new List<Task>(); 
    var items = new ConcurrentQueue<Item>(); 
    var handles = new List<ManualResetEvent>(); 

    for i 1 -> N 
    { 
     var handle = new ManualResetEvent(false); 
     handles.Add(handle); 

     tasks.Add(Task.Factory.StartNew(async delegate 
     { 
      for j 1 -> 10 
      { 
       var item = await GetItemAsync(); 
       items.Enqueue(item); 
      } 
      handle.Set(); 
     }); 
    } 

    //begin to process the items when any handle is set 
    WaitHandle.WaitAny(handles); 

    while(true) 
    { 
     if (all handles are set && items collection is empty) //*** 
      break; 
     //in another word: all tasks are really completed 

     while(items.TryDequeue(out item))   
     { 
       AThreadUnsafeMethod(item); //process items one by one 
     } 
    } 
} 

我不知道條件是否可以放在標記爲***的語句中。我不能在這裏使用Task.IsCompleted屬性,因爲我在任務中使用了await,所以任務很快完成。而一個bool[]表示任務執行到底是否看起來真的很難看,因爲我認爲ManualResetEvent可以做同樣的工作。任何人都可以給我一個建議嗎?

回答

3

嗯,你能自己的身材,但我覺得它更容易與TPL Dataflow噸。

喜歡的東西:

static async Task DoWork() 
{ 
    // By default, ActionBlock uses MaxDegreeOfParallelism == 1, 
    // so AThreadUnsafeMethod is not called in parallel. 
    var block = new ActionBlock<Item>(AThreadUnsafeMethod); 

    // Start off N tasks, each asynchronously acquiring 10 items. 
    // Each item is sent to the block as it is received. 
    var tasks = Enumerable.Range(0, N).Select(Task.Run(
     async() => 
     { 
     for (int i = 0; i != 10; ++i) 
      block.Post(await GetItemAsync()); 
     })).ToArray(); 

    // Complete the block when all tasks have completed. 
    Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); }); 

    // Wait for the block to complete. 
    await block.Completion; 
} 
1

你可以用一個超時時間爲零的WaitOne來檢查狀態。像這樣的東西應該工作:

if (handles.All(handle => handle.WaitOne(TimeSpan.Zero)) && !items.Any()) 
    break; 

http://msdn.microsoft.com/en-us/library/cc190477.aspx

+0

「阻塞當前線程,直到當前實例接收到信號」,這樣您將有10個阻塞線程等待信號。 – Sebastian

+1

@Sebastian「備註:如果超時爲零,則該方法不會阻塞,它將測試等待句柄的狀態並立即返回。」 – jaggedSpire

+0

@jaggedSpire好吧,這是我錯過的信息。謝謝,現在這是我首選的檢查WaitHandles的解決方案。 – Sebastian

0

感謝所有。最後我發現CountDownEvent非常適合這種情況。一般實現如下所示:(對於其他人的信息)

for i 1 -> N 
{ 
    //start N tasks 
    //invoke CountDownEvent.Signal() at the end of each task 
} 

//see if CountDownEvent.IsSet here