純異步溶液。
它使用一個NuGet包Nite.AsyncEx
和System.Reactive
它執行錯誤處理,因爲它們發生在一IObservable<IPHostEntry>
提供DNS的結果有很多怎麼回事。您需要了解reactive extensions作爲標準async programming。可能有很多方法可以實現下面的結果,但這是一個有趣的解決方案。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
public static class EnumerableExtensions
{
public static IEnumerable<Func<U>> Defer<T, U>
(this IEnumerable<T> source, Func<T, U> selector)
=> source.Select(s => (Func<U>)(() => selector(s)));
}
public class Program
{
/// <summary>
/// Returns the time to wait before processing another item
/// if the rate limit is to be maintained
/// </summary>
/// <param name="desiredRateLimit"></param>
/// <param name="currentItemCount"></param>
/// <param name="elapsedTotalSeconds"></param>
/// <returns></returns>
private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
{
var time = elapsedTotalSeconds;
var timeout = currentItemCount/desiredRateLimit;
return timeout - time;
}
/// <summary>
/// Consume the tasks in parallel but with a rate limit. The results
/// are returned as an observable.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tasks"></param>
/// <param name="rateLimit"></param>
/// <returns></returns>
public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
var s = System.Diagnostics.Stopwatch.StartNew();
var n = 0;
var sem = new AsyncCountdownEvent(1);
var errors = new ConcurrentBag<Exception>();
return Observable.Create<T>
(observer =>
{
var ctx = new CancellationTokenSource();
Task.Run
(async() =>
{
foreach (var taskFn in tasks)
{
n++;
ctx.Token.ThrowIfCancellationRequested();
var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
var delay = Delay(rateLimit, n, elapsedTotalSeconds);
if (delay > 0)
await Task.Delay(TimeSpan.FromSeconds(delay), ctx.Token);
sem.AddCount(1);
Task.Run
(async() =>
{
try
{
observer.OnNext(await taskFn());
}
catch (Exception e)
{
errors.Add(e);
}
finally
{
sem.Signal();
}
}
, ctx.Token);
}
sem.Signal();
await sem.WaitAsync(ctx.Token);
if(errors.Count>0)
observer.OnError(new AggregateException(errors));
else
observer.OnCompleted();
}
, ctx.Token);
return Disposable.Create(() => ctx.Cancel());
});
}
#region hosts
public static string [] Hosts = new [] { "google.com" }
#endregion
public static void Main()
{
var s = System.Diagnostics.Stopwatch.StartNew();
var rate = 25;
var n = Hosts.Length;
var expectedTime = n/rate;
IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer(async host =>
{
try
{
return await Dns.GetHostEntryAsync(host);
}
catch (Exception e)
{
throw new Exception($"Can't resolve {host}", e);
}
});
IObservable<IPHostEntry> results = RateLimit(dnsTaskFactories, rate);
results
.Subscribe(result =>
{
Console.WriteLine("result " + DateTime.Now + " " + result.AddressList[0].ToString());
},
onCompleted:() =>
{
Console.WriteLine("Completed");
PrintTimes(s, expectedTime);
},
onError: e =>
{
Console.WriteLine("Errored");
PrintTimes(s, expectedTime);
if (e is AggregateException ae)
{
Console.WriteLine(e.Message);
foreach (var innerE in ae.InnerExceptions)
{
Console.WriteLine($" " + innerE.GetType().Name + " " + innerE.Message);
}
}
else
{
Console.WriteLine($"got error " + e.Message);
}
}
);
Console.WriteLine("Press enter to exit");
Console.ReadLine();
}
private static void PrintTimes(Stopwatch s, int expectedTime)
{
Console.WriteLine("Done");
Console.WriteLine("Elapsed Seconds " + s.Elapsed.TotalSeconds);
Console.WriteLine("Expected Elapsed Seconds " + expectedTime);
}
}
輸出的最後幾行是
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
Exception Can't resolve adv758968.ru
Exception Can't resolve fr.a3dfp.net
Exception Can't resolve ads.adwitserver.com
Exception Can't resolve www.adtrader.com
Exception Can't resolve trak-analytics.blic.rs
Exception Can't resolve ads.buzzcity.net
我不能貼全代碼,所以這裏是與主機列表中的代碼的鏈接。
https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190
你是否考慮過一個隊列被設置爲每x只處理一個項目,所以最多它每秒運行300次(畢竟每秒300次相當多) – BugFinder
可能重複[簡單方法來限制HttpClient請求](http://stackoverflow.com/questions/35493925/simple-way-to-rate-limit-httpclient-requests) – bradgonesurfing
是SemaphoreSlim和一個計時器可以解決這個問題 –