1

在本週早些時候得到了一些有關Stackoverflow的幫助,這導致了生產者/消費者模式可以用於加載處理並將大數據集導入RavenDb。 Parallelization of CPU bound task continuing with IO bound節流生產者/消費者模式導致死鎖

爲了管理內存消耗,我現在正在尋求限制生產者提前準備的工作單元數量。我已經使用基本的信號量實施了限制,但我在某個時候遇到了實現死鎖的問題。

我找不出可能導致死鎖的原因。下面是代碼的摘錄:

private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure) 
    where TParsedData : class 
    where TData : class 
{ 
    Console.WriteLine(@"Loading {0}...", typeof(TData).ToString()); 

    var batchCounter = 0; 

    var ist = Stopwatch.StartNew(); 

    var throttler = new SemaphoreSlim(10); 
    var bc = new BlockingCollection<IndexedBatch<TData>>(); 
    var importTask = Task.Run(() => 
    { 
     bc.GetConsumingEnumerable() 
      .AsParallel() 
      .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 
      //or 
      //.WithDegreeOfParallelism(1) 
      .WithMergeOptions(ParallelMergeOptions.NotBuffered) 
      .ForAll(data => 
      { 
       var st = Stopwatch.StartNew(); 
       importProceedure(data); 

       Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds); 
       throttler.Release(); 
      }); 
    }); 
    var processTask = Task.Run(() => 
    { 
     dataLoader.GetParsedItems() 
      .Partition(batchSize) 
      .AsParallel() 
      .WithDegreeOfParallelism(Environment.ProcessorCount) 
      //or 
      //.WithDegreeOfParallelism(1) 
      .WithMergeOptions(ParallelMergeOptions.NotBuffered) 
      .ForAll(batch => 
      { 
       throttler.Wait(); //.WaitAsync() 
       var batchno = ++batchCounter; 
       var st = Stopwatch.StartNew(); 

       bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch))); 

       Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds); 
      }); 
    }); 

    processTask.Wait(); 
    bc.CompleteAdding(); 
    importTask.Wait(); 

    Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds); 
} 

public class IndexedBatch<TBatch> 
    where TBatch : class 
{ 
    public IndexedBatch(int index, List<TBatch> batch) 
    { 
     Index = index; 
     Batch = batch ?? new List<TBatch>(); 
    } 

    public int Index { get; set; } 
    public List<TBatch> Batch { get; set; } 
} 

這是該呼叫到LoadData提出:

LoadData<DataBase, Data>(
    DataLoaderFactory.Create<DataBase>(datafilePath), 
    1024, 
    (data) => 
    { 
     using (var session = Store.OpenSession()) 
     { 
      foreach (var i in data.Batch) 
      { 
       session.Store(i); 
       d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1))); 
      } 
      session.SaveChanges(); 
     } 
    }, 
    (batch) => 
    { 
     return batch.Select(i => new Data() 
     { 
      ... 
     }).ToList(); 
    } 
); 

Store是一個RavenDb IDocumentStore。 DataLoaderFactory爲給定數據集構造一個自定義分析器。

回答

1

很難調試沒有大箭頭說「塊在這裏!」的死鎖。避免在沒有調試器的情況下調試代碼:BlockingCollection已經可以調節。使用the constructor,該參數採用int boundedCapacity參數並消除信號量。解決您的僵局的可能性非常高。

+0

這可以在沒有死鎖的情況下工作,但節流不會停在boundedCapacity。我已經有75項以上(將其設置爲10)。不知道這是爲什麼? – magix 2012-07-30 08:59:39

1

你可以檢查你有多少線程?由於阻塞,你可能已經耗盡了線程池。如果它認爲你的代碼在沒有它們的情況下會死鎖,那麼TPL會注入比ProcessorCount更多的線程。但它只能達到一定的限度。

無論如何,封鎖TPL內部的任務通常是一個壞主意,因爲內置的啓發式方法在非阻塞的情況下效果最好。

+0

這是在x64 .NET 4.5上。想想這些線程可能會耗盡嗎?我會看一看。 – magix 2012-07-30 09:08:58

相關問題