2017-02-28 69 views
1

我一直在閱讀2個小時,我仍然感到困惑。有人說使用StartNew,有人說Task.Run有人說別的。我知道Task.Run給我一個編譯錯誤。簡單的並行任務及延續

我需要並行啓動多個任務,然後當每個任務成功完成時執行一個延續任務。知道什麼時候所有的阻塞都會有幫助。

以下是我有:

public void DoSomeWork(object workItem) 
    { 
     var tasks = new Task<ResultArgs>[_itemList.Count]; 

     for (int loopCnt = 0; loopCnt < _itemList.Count; loopCnt++) 
     { 
      tasks[loopCnt] = new Task<ResultArgs>.Run(() => 
      { 
       return _itemList[loopCnt].Analyze(workItem); 
      }); 
      tasks[loopCnt].ContinueWith(ReportResults, TaskContinuationOptions.ExecuteSynchronously); 
     } 
    } 

的編譯表示運行中不存在的任務。

很明顯,我有一些東西跑,但我不知道是什麼。

我該如何解決這個問題?

+2

'Task.Run'只在4.5中存在,而不在4.0中。 – VMAtm

回答

2

您可以使用async方法,也可以將項目流入數據流,以下代碼使用Tpl-dataflow來處理項目,將它們傳遞到第二個處理步驟,最後等待處理完成。

using NUnit.Framework; 
using System; 
using System.Collections.Concurrent; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace AsyncProcessing { 

    [TestFixture] 
    public class PipelineTests { 

     [Test] 
     public async Task RunPipeline() { 
      var pipeline = new MyPipeline(); 
      var data = Enumerable.Range(0, 1000).Select(x => new WorkItem(x, x)); 

      foreach(var item in data) { 
       await pipeline.SendAsync(item); 
      } 

      pipeline.Complete(); 
      await pipeline.Completion; 

      //all processing complete    
     } 
    } 

    class MyPipeline { 

     private BufferBlock<WorkItem> inputBuffer; 
     private TransformBlock<WorkItem, WorkItem> analyzeBlock; 
     private TransformBlock<WorkItem, ResultArg> reportBlock; 
     private ActionBlock<ResultArg> postOutput; 

     public ConcurrentBag<ResultArg> OutputBuffer { get; } 
     public Task Completion { get { return postOutput.Completion; } } 

     public MyPipeline() { 
      OutputBuffer = new ConcurrentBag<ResultArg>(); 
      CreatePipeline(); 
      LinkPipeline(); 
     } 

     public void Complete() { 
      inputBuffer.Complete(); 
     } 

     public async Task SendAsync(WorkItem data) { 
      await inputBuffer.SendAsync(data); 
     } 

     public void CreatePipeline() { 
      var options = new ExecutionDataflowBlockOptions() { 
       MaxDegreeOfParallelism = Environment.ProcessorCount, 
       BoundedCapacity = 10 
      }; 

      inputBuffer = new BufferBlock<WorkItem>(options); 

      analyzeBlock = new TransformBlock<WorkItem, WorkItem>(item => { 
       //Anylyze item.... 
       return item; 
      }, options); 

      reportBlock = new TransformBlock<WorkItem, ResultArg>(item => { 
       //report your results, email.. db... etc. 
       return new ResultArg(item.JobId, item.WorkValue); 
      }, options); 

      postOutput = new ActionBlock<ResultArg>(item => { 
       OutputBuffer.Add(item); 
      }, options); 
     } 

     public void LinkPipeline() { 
      var options = new DataflowLinkOptions() { 
       PropagateCompletion = true, 
      }; 

      inputBuffer.LinkTo(analyzeBlock, options); 
      analyzeBlock.LinkTo(reportBlock, options); 
      reportBlock.LinkTo(postOutput, options); 
     } 
    } 

    public class WorkItem { 

     public int JobId { get; set; } 
     public int WorkValue { get; set; } 

     public WorkItem(int id, int workValue) { 
      this.JobId = id; 
      this.WorkValue = workValue; 
     } 
    } 

    public class ResultArg { 

     public int JobId { get; set; } 
     public int Result { get; set; } 

     public ResultArg(int id, int result) { 
      this.JobId = id; 
      this.Result = result; 
     } 
    } 
} 
+0

我最終使用了異步方法的建議。由於編譯器不是大驚小怪,所以我認爲它是正確的。我們會在什麼時候開始測試!我也會研究我以前從未見過的TPL數據流。 – AeroClassics

+0

請參閱數據流中的崩潰課程的編輯:)另請參見數據流介紹(http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html) – JSteward

+0

謝謝!更多閱讀!斯蒂芬的東西通常很好,值得一讀。感謝您的鏈接! – AeroClassics

0

爲什麼不使用Parallel.ForEach循環。這是用於並行執行的任務,它可以使用多個線程和執行更快 Parallet.Foreach

但是,如果您正在做一些涉及鎖定的數據庫相關的輸入輸出操作,它可能會失敗。在這種情況下,我會建議在每個任務中保留一個返回類型,並根據前一個任務的返回類型啓用ext任務。

+0

我已經給了Parallel.ForEach一個想法,但被警告說它不會把我所有的任務都放在準備運行的隊列中。它會每次都限制數量,並且由於近乎實時的限制,我擔心吞吐量。 但是,這確實提醒我我需要擔心異常! – AeroClassics

+0

@AeroClassics'async'也有類似的限制,如果它使用defalt任務調度程序。 – VMAtm

+0

哦小提琴。這不是我真正想聽到的,但我需要知道的。謝謝! – AeroClassics