2016-11-29 75 views
2

我有一個簡單的程序,它遍歷作爲反饋枚舉器實現的無盡枚舉。我已經在TPL和PLINQ中實現了這一點。這兩個示例都在可預測的迭代次數後鎖定:PLINQ爲8,TPL爲3。它的代碼在不使用TPL/PLINQ的情況下執行,運行良好。我已經以非線程安全的方式以及線程安全的方式實現了枚舉器。如果並行度限制爲1(如示例中的情況),則可以使用前者。非線程安全的枚舉器非常簡單,不依賴任何'花哨的'.NET庫類。如果我增加並行度,則在死鎖之前執行的迭代次數增加,例如對於PLINQ,迭代次數爲8 *並行度。循環枚舉的PLINQ迭代導致死鎖

下面是迭代器:
枚舉(非線程)

public class SimpleEnumerable<T>: IEnumerable<T> 
{ 
    private T _value; 
    private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false); 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public IEnumerator<T> GetEnumerator() 
    { 
     while(true) 
     { 
      _releaseValueEvent.WaitOne(); 
      yield return _value; 
     } 
    } 

    public void OnNext(T value) 
    { 
     _value = value; 
     _releaseValueEvent.Set(); 
    } 
} 

枚舉(線程)

public class SimpleEnumerable<T>: IEnumerable<T> 
{ 
    private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>(); 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public IEnumerator<T> GetEnumerator() 
    { 
     while(true) 
     { 
      yield return _blockingCollection.Take(); 
     } 
    } 

    public void OnNext(T value) 
    { 
     _blockingCollection.Add(value); 
    } 
} 

PLINQ實施例:

public static void Main(string[] args) 
{ 
    var enumerable = new SimpleEnumerable<int>(); 
    enumerable.OnNext(0); 

    enumerable 
     .Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}")) 
     .AsParallel() 
     .WithDegreeOfParallelism(1) 
     .ForEach 
     (
      i => 
      { 
       Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"); 
       enumerable.OnNext(i+1); 
      } 
     ); 
} 

TPL實例:在我的調用堆棧的分析

public static void Main(string[] args) 
{ 
    var enumerable = new SimpleEnumerable<int>(); 
    enumerable.OnNext(0); 

    Parallel.ForEach 
    (
     enumerable, 
     new ParallelOptions { MaxDegreeOfParallelism = 1}, 
     i => 
     { 
      Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"); 
      enumerable.OnNext(i+1); 
     } 
    ); 
} 

基地,似乎存在着發生在無論是在PLINQ和TPL一個分區相關方法的僵局,但我不是確定如何解釋這一點。

通過試驗和錯誤我發現包裝PLINQ enumerablePartitioner.Create(enumerable, EnumerablePartitionerOptions.NoBuffering)修復了問題,但我不知道爲什麼發生死鎖。

我會很有興趣找出錯誤的根源。

請注意,這是一個人爲的例子。我不是在尋找批評的代碼,而是爲什麼是發生死鎖。具體而言,在PLINQ示例中,如果.AsParallel().WithDegreeOfParallelism(1)行被註釋掉,則代碼工作得很好。

+0

PLINQ和並行不會導致死鎖,他們使用當前線程與其他人並行處理數據 –

+0

@PanagiotisKanavos顯然,死鎖是在他的迭代器中。乍一看,我並不感到驚訝,它肯定看起來並不安全。 – Servy

+0

@Servy我從最明顯的問題開始。迭代器...是一個非常非常奇怪的結構。一個簡單的10K數組將足以測試並行執行。 'Interlocked.Increment'將是一個非常好的方法來保持計數。這個迭代器只是阻止了 –

回答

2

你實際上並沒有一個邏輯順序的值,所以試圖首先創建一個IEnumerable根本沒有任何意義。此外,您幾乎可以肯定不會嘗試創建可由多個線程使用的IEnumerator。這就是瘋狂,僅僅是因爲IEnumerator公開的界面並沒有真正暴露出你想要做的事情。您可能會創建一個IEnumerator,它只會由單個線程使用,該線程將根據多個線程使用的基礎數據源來計算數據的返回值,因爲這是相當不同的。

如果您只是想創建一個生產者和消費者在不同的線程中運行,請不要在BlockingCollection周圍創建自己的「包裝」,只需使用BlockingCollection。讓生產者加入它,然後消費者從中讀取它。消費者可以使用GetConsumingEnumerable,如果它只是想在對這些項目進行迭代時進行迭代(一種常見的操作)。

+0

我給出的例子是從完全不同的代碼中提取的。這是證明問題的一個人爲的例子。原始代碼沒有使用枚舉器或阻塞集合。我讚賞你的評論,但我會說你完全錯過了練習的要點。我沒有太多的要求批評'代碼是做什麼'或'代碼是如何做'的,而是'爲什麼'是PLINQ和TPL中的代碼死鎖。 –

+1

@TomasC你說你真正關心的代碼看起來什麼都沒有,就像你展示的代碼一樣,意味着你不可能獲得關於你關心的事情的任何信息。如果你問一個關於如何在多個線程中使用迭代器的問題時,如果你實際上沒有一個枚舉器,並且它沒有共享你的例子的實現細節,那麼我們完全不瞭解你的情況,顯然不能評論在上面。我們只能對您實際提供的代碼發表評論。 – Servy

+0

我很清楚如何使用枚舉器。我沒有詢問如何使用來自多個線程的枚舉器。我知道該怎麼做。我只是在PLINQ代碼中出現了一些導致死鎖的行爲,並且我希望得到它爲什麼會死鎖的原因(正如我在之前的評論中明確指出的那樣)。你似乎忽略了我所問的問題,即爲什麼它會陷入僵局?這就是我想知道的。否則,您的意見是非常感謝和非常有見地的。 –