2010-06-09 68 views
1

我剛開始探索PTL並有一個設計問題。並行任務庫WaitAny Design

我的情景: 我有一個URL列表,每個指向一個圖像。我希望每個圖像都可以並行下載。只要至少有一個圖像被下載,我想要執行一個方法來處理下載的圖像。該方法不應該並行 - 它應該是串行的。

我認爲以下方法可行,但我不確定這是否正確。因爲我有單獨的類來收集圖像和爲收集的圖像做「事情」,所以我最終傳遞了一系列任務,這似乎是錯誤的,因爲它暴露了圖像檢索的內部工作原理。但我不知道解決方法。實際上,這兩種方法都有更多,但這對此並不重要。只要知道他們真的不應該被集中到一個既能檢索圖像又能處理圖像的大型方法。

//From the Director class 
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs); 

for (int i = 0; i < listOfURLs.Count; i++) 
{ 
    //Wait for any of the remaining downloads to complete 
    int completedIndex = Task<Image>.WaitAny(downloadTasks); 
    Image completedImage = downloadTasks[completedIndex].Result; 

    //Now do something with the image (this "something" must happen serially) 
    //Uses the "Formatter" class to accomplish this let's say 
} 

/////////////////////////////////////////////////// 

//From the Collector class 
public Task<Image>[] RetrieveImages(List<string> urls) 
{ 
    Task<Image>[] tasks = new Task<Image>[urls.Count]; 

    int index = 0; 
    foreach (string url in urls) 
    { 
     string lambdaVar = url; //Required... Bleh 
     tasks[index] = Task<Image>.Factory.StartNew(() => 
      { 
       using (WebClient client = new WebClient()) 
       { 
        //TODO: Replace with live image locations 
        string fileName = String.Format("{0}.png", i); 
        client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName)); 
       } 

       return Image.FromFile(Path.Combine(Application.StartupPath, fileName)); 
      }, 
      TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent); 

     index++; 
    } 

    return tasks; 
} 

回答

9

通常,您使用了WaitAny等待一個任務,當你不關心結果之前將其旋轉任何其他人。例如,如果你只是關心發生的第一個圖像返回。

這是怎麼回事。

這會創建兩個任務,一個加載圖像並將其添加到阻塞集合中。第二個任務等待集合並處理添加到隊列中的所有圖像。當所有的圖像被加載時,第一個任務關閉隊列,所以第二個任務可以關閉。

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Drawing; 
using System.IO; 
using System.Net; 
using System.Threading.Tasks; 

namespace ClassLibrary1 
{ 
    public class Class1 
    { 
     readonly string _path = Directory.GetCurrentDirectory(); 

     public void Demo() 
     { 
      IList<string> listOfUrls = new List<string>(); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif"); 
      listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif"); 

      BlockingCollection<Image> images = new BlockingCollection<Image>(); 

      Parallel.Invoke(
       () =>     // Task 1: load the images 
       { 
        Parallel.For(0, listOfUrls.Count, (i) => 
         { 
          Image img = RetrieveImages(listOfUrls[i], i); 
          img.Tag = i; 
          images.Add(img); // Add each image to the queue 
         }); 
        images.CompleteAdding(); // Done with images. 
       }, 
       () =>     // Task 2: Process images serially 
       { 
        foreach (var img in images.GetConsumingEnumerable()) 
        { 
         string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag)); 
         Console.WriteLine("Rotating image {0}", img.Tag); 
         img.RotateFlip(RotateFlipType.RotateNoneFlipXY); 

         img.Save(newPath); 
        } 
       }); 
     } 

     public Image RetrieveImages(string url, int i) 
     { 
      using (WebClient client = new WebClient()) 
      { 
       string fileName = Path.Combine(_path, String.Format("{0}.png", i)); 
       Console.WriteLine("Downloading {0}...", url); 
       client.DownloadFile(url, Path.Combine(_path, fileName)); 
       Console.WriteLine("Saving {0} as {1}.", url, fileName); 
       return Image.FromFile(Path.Combine(_path, fileName)); 
      } 
     } 
    } 
} 

警告:該代碼沒有任何錯誤檢查或取消。現在已經很晚了,你需要做些什麼嗎? :)

這是管道模式的一個例子。它假定獲取圖像的速度非常慢,並且鎖定收集內部的鎖定成本不會引起問題,因爲與下載圖像花費的時間相比,它發生的頻率相對較低。

我們的書......你可以在http://parallelpatterns.codeplex.com/ 第7章閱讀更多關於這一點,其他模式來進行並行編程涵蓋管道和伴隨的實例表明,錯誤處理和消除管道。

+1

知識一天的寶石。感謝Ade。很棒的信息 – 2012-02-10 17:27:13

0

這樣做很可能是通過實現Observer模式的最佳方式:有你RetreiveImages功能實現IObservable,把你的「已完成形象行動」爲IObserver對象的OnNext方法,並將其訂閱到RetreiveImages

我自己還沒有嘗試過這個(仍然需要更多地使用任務庫),但我認爲這是做到這一點的「正確」方式。

+0

我想可能是因爲我在努力學習的同時,PTL和Observer模式,但我似乎無法得到它的權利。如果我有Collector類(如圖所示),Director類(頂級代碼片段)和Formatter類(做「某事」),我將如何實現這一點? – colithium 2010-06-09 08:00:22

2

當另一個任務完成時,TPL已經提供了ContinueWith功能來執行一項任務。任務鏈是TPL用於異步操作的主要模式之一。

下面的方法下載一組圖像,並通過重命名每個文件

static void DownloadInParallel(string[] urls) 
{ 
    var tempFolder = Path.GetTempPath(); 

    var downloads = from url in urls 
        select Task.Factory.StartNew<string>(() =>{ 
         using (var client = new WebClient()) 
         { 
          var uri = new Uri(url); 
          string file = Path.Combine(tempFolder,uri.Segments.Last()); 
          client.DownloadFile(uri, file); 
          return file; 
         } 
        },TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent) 
        .ContinueWith(t=>{ 
         var filePath = t.Result; 
         File.Move(filePath, filePath + ".test"); 
        },TaskContinuationOptions.ExecuteSynchronously); 

    var results = downloads.ToArray(); 
    Task.WaitAll(results); 
} 

的您也應該檢查從ParallelExtensionsExtras樣品WebClient Async Tasks繼續。 DownloadXXXTask擴展方法處理任務的創建和文件的異步下載。

下面的方法使用DownloadDataTask擴展獲得圖像的數據,並將其保存到磁盤

static void DownloadInParallel2(string[] urls) 
{ 
    var tempFolder = Path.GetTempPath(); 

    var downloads = from url in urls 
     let uri=new Uri(url) 
     let filePath=Path.Combine(tempFolder,uri.Segments.Last()) 
     select new WebClient().DownloadDataTask(uri)               
     .ContinueWith(t=>{ 
      var img = Image.FromStream(new MemoryStream(t.Result)); 
      img.RotateFlip(RotateFlipType.RotateNoneFlipY); 
      img.Save(filePath); 
     },TaskContinuationOptions.ExecuteSynchronously); 

    var results = downloads.ToArray(); 
    Task.WaitAll(results); 
} 
+0

兩件事,我不認爲TaskContinuationOptions.ExecuteSynchronously做我所需要的。這個「東西」(在你的例子中移動文件)不能同時發生在多個線程上。讓我們假裝不是移動文件,而是通過串行電纜與設備進行通信。 兩個,就像我所說的,它比我簡化它更多。我認爲將這兩項任務合併爲同一種方法並不合適。但是這迫使我繞過任務,這似乎是一個不好的模式。 – colithium 2010-06-09 22:02:52

+0

你稱之爲壞模式是TPL的實際設計理念。在這件事上,它實際上非常接近F#。其次,ExecuteSynchronously意味着繼續將使用與之前的任務相同的線程運行。最後,你不會將任務合併到同一個方法中。傳遞給任務或延續的lambda是另一個匿名函數。您可以輕鬆地傳遞方法名稱而不是使用lambda。 如果你發現TPL的工作方式不舒服,你應該看看不同的庫或模式,而不是試圖對付它。 – 2010-06-10 08:34:29

+0

這很好,可能是答案。如果是這種情況,你會得到你的回票並被接受。 至於我的兩點,雖然我認爲我們有溝通的細分。 1)ExecuteSynchronously意味着延續可能同時在幾個線程上運行。在我的情況下,延續只能在任何給定時間在一個線程上運行。 2)我知道它在技術上是它自己的方法,我也可以放入方法調用。但是這兩件事實際上是兩個完全獨立的類,它們甚至不應該彼此瞭解。因此,我的困境。 – colithium 2010-06-10 10:03:19

0

//下載所有圖像

private async void GetAllImages() 
{ 
    var downloadTasks = listOfURLs.Where(url => !string.IsNullOrEmpty(url)).Select(async url => 
      { 
       var ret = await RetrieveImage(url); 
       return ret; 
     }).ToArray(); 

     var counts = await Task.WhenAll(downloadTasks); 
} 

//From the Collector class 
public async Task<Image> RetrieveImage(string url) 
{ 
    var lambdaVar = url; //Required... Bleh 
    using (WebClient client = new WebClient()) 
    { 
     //TODO: Replace with live image locations 
     var fileName = String.Format("{0}.png", i); 
     await client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName)); 
    } 
    return Image.FromFile(Path.Combine(Application.StartupPath, fileName)); 
}