2017-04-11 102 views
2

我想同時抓取幾個URL。每個請求可能會將更多網址添加到ConcurrentBag進行抓取。目前我有一個令人討厭的(真),開始新的Parallel.ForEach來處理任何新的URL。添加項目到ConcurrentBag用於Parallel.ForEach c#

是否有任何方法可以添加到ConcurrentBag的內容中,因此Parallel.ForEach會看到其中有新項目並繼續迭代這些新項目?

ConcurrentBag<LinkObject> URLSToCheck = new ConcurrentBag<LinkObject>(); 

while (true) 
{ 
    Parallel.ForEach(URLSToCheck, new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
    { 
     Checker Checker = new Checker(); 

     URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 

     List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 

     foreach (var URLToAdd in URLsToAdd) 
     { 
      URLSToCheck.Add(new LinkObject { sourceURL = URLToAdd.sourceURL, destinationURL = URLToAdd.destinationURL }); 
     } 
    }); 

    if(URLSToCheck.Count == 0)break; 
} 
+0

潛入遞歸代碼可能會有幫助。這是一個適用的典型例子。順便說一句,謹防循環引用。 – Stefan

+0

謝謝我會檢查出來! :-) – jamie

回答

2

DataFlow在這裏可以得心應手。隨着ActionBlock可以很好地完成:

// Capture the variable, so it can be used in the next block 
ActionBlock<LinkObject> = actionBlock = null; 

actionBlock = new ActionBlock<LinkObject>(URL => 
{ 
    Checker Checker = new Checker(); 
    URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 
    List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 
    URLsToAdd.ForEach(actionBlock.Post) 
},new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5}); 

,然後添加到您的actionBlock初始網址:

actionBlock.Post(url1); 
actionBlock.Post(url2); 
... 
+0

謝謝,這確實有助於:-)如果其他人使用此安裝Microsoft.Tpl.Dataflow通過NuGet – jamie

3

你可以看看BlockingCollection

BlockingCollection提供了生產者/消費者模式的實現:您的生產者將添加到阻塞集合中,並且您的Parallel.ForEach將從集合中消耗。

要做到這一點,你將不得不實現自定義分區的BlockingCollection(的原因說明如下:https://blogs.msdn.microsoft.com/pfxteam/2010/04/06/parallelextensionsextras-tour-4-blockingcollectionextensions/

分區程序:

class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions 
    { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 

     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
} 

然後,你將用它喜歡:

BlockingCollection<LinkObject> URLSToCheck = new BlockingCollection<LinkObject>(); 

Parallel.ForEach(
    new BlockingCollectionPartitioner<LinkObject>(URLSToCheck), 
    new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
     { 
      //.... 
     }); 

在另一個線程中,您將添加到URLSToCheck集合:

URLSToCheck.Add(...) 

當您完成網址處理時,您可以撥打URLSToCheck.CompleteAdding()Parallel.ForEach應該自動停止。