2014-10-28 68 views
13

我有這樣的代碼:如何並行運行LINQ'let'語句?

var list = new List<int> {1, 2, 3, 4, 5}; 

var result = from x in list.AsParallel() 
      let a = LongRunningCalc1(x) 
      let b = LongRunningCalc2(x) 
      select new {a, b}; 

比方說,LongRunningCalc方法各服1次。上面的代碼需要大約2秒的時間才能運行,因爲當並行操作5個元素的列表時,從let語句調用的兩個方法被順序調用。

但是,這些方法也可以安全地並行調用。他們顯然需要合併回select,但在此之前應該並行運行 - select應該等待它們。

有沒有辦法做到這一點?

回答

7

您將無法使用查詢語法或let操作,但你可以寫並行執行的每個項目多個操作的方法:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<TResult1, TResult2, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     return resultAggregator(result1.Result, result2.Result); 
    }); 
} 

這將使你寫:

var query = list.AsParallel() 
    .SelectAll(LongRunningCalc1, 
     LongRunningCalc2, 
     (a, b) => new {a, b}) 

你可以額外的並行操作添加過載以及:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal> 
    (this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<T, TResult3> selector3, 
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     var result3 = Task.Run(() => selector3(item)); 
     return resultAggregator(
      result1.Result, 
      result2.Result, 
      result3.Result); 
    }); 
} 

它可以寫一個版本來處理一些在編譯的時候不知道選擇的,但要做到這一點,他們都需要計算出相同類型的值:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    IEnumerable<Func<T, TResult>> selectors) 
{ 
    return query.Select(item => selectors.AsParallel() 
      .Select(selector => selector(item)) 
      .AsEnumerable()); 
} 
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    params Func<T, TResult>[] selectors) 
{ 
    return SelectAll(query, selectors); 
} 
+0

這不是我的專家領域,但你不需要等待任務的結果? – Magnus 2014-10-28 17:56:13

+0

@Magnus我已經是。 – Servy 2014-10-28 17:58:15

+0

Yeh調用'.Result'等待任務執行。我簡單地簡化了我的示例代碼,但這使我走上了正確的道路,感謝您的幫助。 – Lyall 2014-10-28 18:01:53

0

我會做到這一點使用微軟的反應框架(NuGet中的「Rx-Main」)。

這就是:

var result = 
    from x in list.ToObservable() 
    from a in Observable.Start(() => LongRunningCalc1(x)) 
    from b in Observable.Start(() => LongRunningCalc2(x)) 
    select new {a, b}; 

的好處是,你可以使用.Subscribe(...)方法訪問結果,因爲它們產生:

result.Subscribe(x => /* Do something with x.a and/or x.b */); 

超級簡單!