2017-05-22 27 views
2

我已使用Dns.BeginGetHostEntry方法根據主機名獲取主機的FQDN(主機名列表存儲在SQL Server數據庫中) 。 此方法(異步)在不到30分鐘內爲近150k條記錄完成運行,並在存儲主機名的同一個SQL表中更新FQDN。限制no。 Dns.BeginGetHostEntry方法生成的每秒請求數或使用任務並行庫(TPL)

該解決方案運行速度太快(超過每秒300個請求的閾值)。由於允許的號碼。的服務器生成的請求是有限的,我的服務器列在頂級講者,並要求停止運行這個應用程序。我不得不重建這個應用程序同步運行,現在需要6個多小時才能完成。

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further) 
for (int i = 0; i < TotalRecords.Rows.Count; i++) 
{ 
    try 
    { 
     host = TotalRecords.Rows[i].ItemArray[0].ToString(); 
     Interlocked.Increment(ref requestCounter); 
     string[] arr = new string[] { i.ToString(), host }; 
     Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr); 
    } 
    catch (Exception ex) 
    { 
     log.Error("Unknown error occurred\n ", ex); 
    } 
} 
do 
{ 
    Thread.Sleep(0); 

} while (requestCounter>0); 

ListAdapter.Update(TotalRecords);

問題:

  1. 有什麼辦法通過這種方法產生的請求數每秒可以限制?

  2. 我有一個理解,ParallelOptions.MaxDegreeOfParallelism不控制線程每秒,所以有沒有什麼辦法TPL可以是更好的選擇?這可以限制爲沒有。每秒請求數?

+0

你是否考慮過一個隊列被設置爲每x只處理一個項目,所以最多它每秒運行300次(畢竟每秒300次相當多) – BugFinder

+0

可能重複[簡單方法來限制HttpClient請求](http://stackoverflow.com/questions/35493925/simple-way-to-rate-limit-httpclient-requests) – bradgonesurfing

+0

是SemaphoreSlim和一個計時器可以解決這個問題 –

回答

0

純異步溶液。

它使用一個NuGet包Nite.AsyncExSystem.Reactive它執行錯誤處理,因爲它們發生在一IObservable<IPHostEntry>

提供DNS的結果有很多怎麼回事。您需要了解reactive extensions作爲標準async programming。可能有很多方法可以實現下面的結果,但這是一個有趣的解決方案。

using System; 
using System.Collections.Concurrent; 
using System.Threading.Tasks; 
using System.Linq; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Net; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using Nito.AsyncEx; 
using System.Threading; 

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed 

public static class EnumerableExtensions 
{ 
    public static IEnumerable<Func<U>> Defer<T, U> 
     (this IEnumerable<T> source, Func<T, U> selector) 
     => source.Select(s => (Func<U>)(() => selector(s))); 
} 


public class Program 
{ 
    /// <summary> 
    /// Returns the time to wait before processing another item 
    /// if the rate limit is to be maintained 
    /// </summary> 
    /// <param name="desiredRateLimit"></param> 
    /// <param name="currentItemCount"></param> 
    /// <param name="elapsedTotalSeconds"></param> 
    /// <returns></returns> 
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds) 
    { 
     var time = elapsedTotalSeconds; 
     var timeout = currentItemCount/desiredRateLimit; 
     return timeout - time; 
    } 

    /// <summary> 
    /// Consume the tasks in parallel but with a rate limit. The results 
    /// are returned as an observable. 
    /// </summary> 
    /// <typeparam name="T"></typeparam> 
    /// <param name="tasks"></param> 
    /// <param name="rateLimit"></param> 
    /// <returns></returns> 
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){ 
     var s = System.Diagnostics.Stopwatch.StartNew(); 
     var n = 0; 
     var sem = new AsyncCountdownEvent(1); 

     var errors = new ConcurrentBag<Exception>(); 

     return Observable.Create<T> 
      (observer => 
      { 

       var ctx = new CancellationTokenSource(); 
       Task.Run 
        (async() => 
        { 
         foreach (var taskFn in tasks) 
         { 
          n++; 
          ctx.Token.ThrowIfCancellationRequested(); 

          var elapsedTotalSeconds = s.Elapsed.TotalSeconds; 
          var delay = Delay(rateLimit, n, elapsedTotalSeconds); 
          if (delay > 0) 
           await Task.Delay(TimeSpan.FromSeconds(delay), ctx.Token); 

          sem.AddCount(1); 
          Task.Run 
           (async() => 
           { 
            try 
            { 
             observer.OnNext(await taskFn()); 
            } 
            catch (Exception e) 
            { 
             errors.Add(e); 
            } 
            finally 
            { 
             sem.Signal(); 
            } 
           } 
           , ctx.Token); 
         } 
         sem.Signal(); 
         await sem.WaitAsync(ctx.Token); 
         if(errors.Count>0) 
          observer.OnError(new AggregateException(errors)); 
         else 
          observer.OnCompleted(); 
        } 
         , ctx.Token); 

       return Disposable.Create(() => ctx.Cancel()); 
      }); 
    } 

    #region hosts 



    public static string [] Hosts = new [] { "google.com" } 

    #endregion 


    public static void Main() 
    { 
     var s = System.Diagnostics.Stopwatch.StartNew(); 

     var rate = 25; 

     var n = Hosts.Length; 

     var expectedTime = n/rate; 

     IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer(async host => 
     { 
      try 
      { 
       return await Dns.GetHostEntryAsync(host); 
      } 
      catch (Exception e) 
      { 
       throw new Exception($"Can't resolve {host}", e); 
      } 
     }); 

     IObservable<IPHostEntry> results = RateLimit(dnsTaskFactories, rate); 

     results 
      .Subscribe(result => 
      { 
       Console.WriteLine("result " + DateTime.Now + " " + result.AddressList[0].ToString()); 
      }, 
      onCompleted:() => 
      { 
       Console.WriteLine("Completed"); 

       PrintTimes(s, expectedTime); 
      }, 
      onError: e => 
      { 
       Console.WriteLine("Errored"); 

       PrintTimes(s, expectedTime); 

       if (e is AggregateException ae) 
       { 
        Console.WriteLine(e.Message); 
        foreach (var innerE in ae.InnerExceptions) 
        { 
         Console.WriteLine($"  " + innerE.GetType().Name + " " + innerE.Message); 
        } 
       } 
       else 
       { 
         Console.WriteLine($"got error " + e.Message); 
       } 
      } 

      ); 

     Console.WriteLine("Press enter to exit"); 
     Console.ReadLine(); 
    } 

    private static void PrintTimes(Stopwatch s, int expectedTime) 
    { 
     Console.WriteLine("Done"); 
     Console.WriteLine("Elapsed Seconds " + s.Elapsed.TotalSeconds); 
     Console.WriteLine("Expected Elapsed Seconds " + expectedTime); 
    } 
} 

輸出的最後幾行是

result 5/23/2017 3:23:36 PM 84.16.241.74 
result 5/23/2017 3:23:36 PM 84.16.241.74 
result 5/23/2017 3:23:36 PM 157.7.105.52 
result 5/23/2017 3:23:36 PM 223.223.182.225 
result 5/23/2017 3:23:36 PM 64.34.93.5 
result 5/23/2017 3:23:36 PM 212.83.211.103 
result 5/23/2017 3:23:36 PM 205.185.216.10 
result 5/23/2017 3:23:36 PM 198.232.125.32 
result 5/23/2017 3:23:36 PM 66.231.176.100 
result 5/23/2017 3:23:36 PM 54.239.34.12 
result 5/23/2017 3:23:36 PM 54.239.34.12 
result 5/23/2017 3:23:37 PM 219.84.203.116 
Errored 
Done 
Elapsed Seconds 19.9990118 
Expected Elapsed Seconds 19 
One or more errors occurred. 
    Exception Can't resolve adv758968.ru 
    Exception Can't resolve fr.a3dfp.net 
    Exception Can't resolve ads.adwitserver.com 
    Exception Can't resolve www.adtrader.com 
    Exception Can't resolve trak-analytics.blic.rs 
    Exception Can't resolve ads.buzzcity.net 

我不能貼全代碼,所以這裏是與主機列表中的代碼的鏈接。

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

+0

感謝您的輸入。有沒有可能的解決方案,而不包括Nuget包?我在原帖中增加了幾個問題,這在我的應用程序中似乎是瓶頸。請查看並幫助我獲得更完整的解決方案。 –

+0

包含nuget包有什麼問題?包含的框架CountDownEvent沒有'WaitAsync'。也許監督。如果你不包含它,你必須從SemaphoreSlim和其他一些煩瑣的事情建立自己的計數器。 – bradgonesurfing

+0

(1和2)已經回答了我的回答。 (3)在這個問題上提出另一個關於stackoverflow的問題。它只與限速問題有關。 (4)簡單(自啓動以來的成功請求次數/時間)將其打印到控制檯,記錄任何內容。 (5)問另一個stackoverflow問題。無關速率限制問題 – bradgonesurfing

0

使用SemaphoreSlimTimer限制每個週期的要求。

[DebuggerDisplay("Current Count = {_semaphore.CurrentCount}")] 
public class TimedSemaphoreSlim : IDisposable 
{ 
    private readonly System.Threading.SemaphoreSlim _semaphore; 
    private readonly System.Threading.Timer _timer; 
    private int _releaseCount; 

    public TimedSemaphoreSlim(int initialcount, TimeSpan period) 
    { 
     _semaphore = new System.Threading.SemaphoreSlim(initialcount); 
     _timer = new System.Threading.Timer(OnTimer, this, period, period); 
    } 

    public TimedSemaphoreSlim(int initialCount, int maxCount, TimeSpan period) 
    { 
     _semaphore = new SemaphoreSlim(initialCount, maxCount); 
     _timer = new Timer(OnTimer, this, period, period); 
    } 

    private void OnTimer(object state) 
    { 
     var releaseCount = Interlocked.Exchange(ref _releaseCount, 0); 
     if (releaseCount > 0) 
      _semaphore.Release(releaseCount); 
    } 

    public WaitHandle AvailableWaitHandle => _semaphore.AvailableWaitHandle; 
    public int CurrentCount => _semaphore.CurrentCount; 

    public void Release() 
    { 
     Interlocked.Increment(ref _releaseCount); 
    } 

    public void Release(int releaseCount) 
    { 
     Interlocked.Add(ref _releaseCount, releaseCount); 
    } 

    public void Wait() 
    { 
     _semaphore.Wait(); 
    } 

    public void Wait(CancellationToken cancellationToken) 
    { 
     _semaphore.Wait(cancellationToken); 
    } 

    public bool Wait(int millisecondsTimeout) 
    { 
     return _semaphore.Wait(millisecondsTimeout); 
    } 

    public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) 
    { 
     return _semaphore.Wait(millisecondsTimeout, cancellationToken); 
    } 

    public bool Wait(TimeSpan timeout, CancellationToken cancellationToken) 
    { 
     return _semaphore.Wait(timeout, cancellationToken); 
    } 

    public Task WaitAsync() 
    { 
     return _semaphore.WaitAsync(); 
    } 

    public Task WaitAsync(CancellationToken cancellationToken) 
    { 
     return _semaphore.WaitAsync(cancellationToken); 
    } 

    public Task<bool> WaitAsync(int millisecondsTimeout) 
    { 
     return _semaphore.WaitAsync(millisecondsTimeout); 
    } 

    public Task<bool> WaitAsync(TimeSpan timeout) 
    { 
     return _semaphore.WaitAsync(timeout); 
    } 

    public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) 
    { 
     return _semaphore.WaitAsync(millisecondsTimeout, cancellationToken); 
    } 

    public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 
    { 
     return _semaphore.WaitAsync(timeout, cancellationToken); 
    } 

    #region IDisposable Support 
    private bool disposedValue = false; // Dient zur Erkennung redundanter Aufrufe. 

    private void CheckDisposed() 
    { 
     if (disposedValue) 
     { 
      throw new ObjectDisposedException(nameof(TimedSemaphoreSlim)); 
     } 
    } 

    protected virtual void Dispose(bool disposing) 
    { 
     if (!disposedValue) 
     { 
      if (disposing) 
      { 
       _timer.Dispose(); 
       _semaphore.Dispose(); 
      } 

      disposedValue = true; 
     } 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
    } 
    #endregion 
} 

示例用法

IEnumerable<string> bunchOfHosts = GetBunchOfHosts(); 
IList<IPHostEntry> result; 

using (var limiter = new TimedSemaphoreSlim(300, 300, TimeSpan.FromSeconds(1))) 
{ 
    result = bunchOfHosts.AsParallel() 
     .Select(e => 
     { 
      limiter.Wait(); 
      try 
      { 
       return Dns.GetHostEntry(e); 
      } 
      finally 
      { 
       limiter.Release(); 
      } 
     }) 
     .ToList(); 
} 
+0

當你用像DnsRequest這樣需要很長時間才能運行的東西來代替Console.WriteLine時,這並不能很好地工作,如果你想保持一定的速率,你需要並行地運行這些請求。 – bradgonesurfing

+0

@bradgonesurfing我也在考慮這個問題,並且將這個類擴展爲SemaphoreSlim(Wait(); Act(); Release();),但真正的發佈是由內部定時器 –

0

假如你是否考慮過使用TPL Dataflow庫?它有一個非常方便的方法來限制同一類型的併發操作。它也有機會通過limiting the buffer size來節流整個管道。

基本上所有你需要創建與管道:

所以,你的代碼可能是這樣的:

// buffer limited to 30 items in queue 
// all other items would be postponed and added to queue automatically 
// order in queue is preserved 
var hosts = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 30 }); 

// get a host and perform a dns search operation 
var handler = new TransformBlock<string, IPHostEntry>(host => Dns.GetHostEntry(host), 
    // no more than 5 simultaneous requests at a time 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

// gather results in an array of size 500 
var batchBlock = new BatchBlock<IPHostEntry>(500); 

// get the resulting array and save it to database 
var batchSave = new ActionBlock<IPHostEntry[]>(r => GetHostEntryCallback(r)); 

// link all the blocks to automatically propagate items along the pipeline 
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; 
hosts.LinkTo(handler, linkOptions); 
handler.LinkTo(batchBlock, linkOptions); 
batchBlock.LinkTo(batchSave, linkOptions); 

// provide the data to pipeline 
for (var i = 0; i < TotalRecords.Rows.Count; ++i) 
{ 
    var host = TotalRecords.Rows[i].ItemArray[0].ToString(); 
    // async wait for item to be sent to pipeline 
    // will throttle starting with 31th item in a buffer queue 
    await hosts.SendAsync(host); 
} 

// pipeline is complete now, just wait it finishes 
hosts.Complete(); 

// wait for the last block to finish it's execution 
await batchSave.Completion; 

// notify user that update is over 

我鼓勵你閱讀整個How-to section on MSDN以更好地瞭解什麼,你可以使用這個庫做,也許繼續讀與official documentation

順便說一句,你可能會use the SqlBulkCopy class to update the database,如果它符合你的要求,通常它比定期更新SqlDataAdapter更快。

+0

要求是節流基於時間(每秒),maxdegreeofparallelism的請求在這裏沒有幫助。 –

+0

您可以實現一個簡單的模塊來控制吞吐量,但TPL內部沒有任何東西爲您提供這種功能。 – VMAtm

+0

任何原因downvote? – VMAtm