我不認爲你真的需要直接TPL Tasks
趴下和骯髒這一點。對於初學者,我會在BlockingCollection
上設置一個BlockingCollection
約ConcurrentQueue
(默認值),而不設置BoundedCapacity
以存儲需要處理的ID。
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
從那裏我只想從BlockingCollection::GetConsumingEnumerable
返回的枚舉使用Parallel::ForEach
。在ForEach
調用中,您將設置您的ParallelOptions::MaxDegreeOfParallelism
在ForEach
的正文中,您將執行您的存儲過程。
現在,一旦存儲過程執行完成,你就說你不想重新計劃執行至少兩秒鐘。沒問題,安排System.Threading.Timer
回調,它將簡單地將ID添加回提供的回調中的BlockingCollection
。
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
最後,當進程正在關閉,你會打電話BlockingCollection::CompleteAdding
從而使枚舉正在與停止阻止和完整,並行處理::的ForEach將退出。例如,如果這是Windows服務,您可以在OnStop
中執行此操作。
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
更新
你提出你的意見很關注,你可能在任何給定的點來處理大量的ID,並擔心會有過多的開銷,每ID的計時器。我完全同意這一點。因此,在您同時處理ID的大名單的情況下,我會使用一個計時器,每ID使用另一個隊列來保存這是由一個單一的短間隔定時監控,而不是「沉睡」的ID改變。首先,您需要一個ConcurrentQueue
在其中放置是睡着的ID:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
現在,我使用的是兩部分Tuple
這裏用於說明目的,但你可能要創建一個更強類型結構爲它(或用using
聲明至少它的別名)爲更好的可讀性。元組有ID,當它被放入隊列代表一個DateTime。
現在,你還需要設置,將監視此隊列中的計時器:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
,那麼只需在改變Parallel::ForEach
做到以下幾點,而不是設置一個計時器爲每個:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
反應性編程 – 2011-07-01 21:22:46