2011-06-10 184 views
6

我目前正在研究一個項目,在那裏我們有挑戰來並行處理項目。到目前爲止沒有什麼大不了的;) 現在到了這個問題。我們有一個ID列表,我們定期(每2秒)爲每個ID調用一個StoredProcedure。 需要爲每個項目單獨檢查2秒,因爲它們是在運行時添加和刪除的。 另外我們要配置最大並行度,因爲數據庫不應該同時被300個線程充斥。 正在處理的項目不應被重新計劃處理,直到完成前一個執行。原因是我們想要防止排隊很多項目,以防DB延遲。TPL體系結構問題

現在我們正在使用一個自主開發的組件,它有一個主線程,它定期檢查哪些項目需要安排處理。一旦它有了這個列表,它將把它們放在一個自定義的基於IOCP的線程池中,然後使用waithandles等待正在處理的項目。然後下一次迭代開始。 IOCP,因爲它提供了工作竊取。

我想用TPL/.NET 4版本替換這個自定義實現,我想知道你將如何解決它(理想情況下簡單,很好可讀/可維護)。 我知道這篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只是限制正在使用的線程數量。葉工作偷竊,定期執行的項目....

理想情況下,它將成爲一個通用的組件,可以用於一些所有需要定期完成的項目列表的任務。

任何輸入歡迎, TIA 馬丁

+0

反應性編程 – 2011-07-01 21:22:46

回答

9

我不認爲你真的需要直接TPL Tasks趴下和骯髒這一點。對於初學者,我會在BlockingCollection上設置一個BlockingCollectionConcurrentQueue(默認值),而不設置BoundedCapacity以存儲需要處理的ID。

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) 
BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

從那裏我只想從BlockingCollection::GetConsumingEnumerable返回的枚舉使用Parallel::ForEach。在ForEach調用中,您將設置您的ParallelOptions::MaxDegreeOfParallelismForEach的正文中,您將執行您的存儲過程。

現在,一旦存儲過程執行完成,你就說你不想重新計劃執行至少兩秒鐘。沒問題,安排System.Threading.Timer回調,它將簡單地將ID添加回提供的回調中的BlockingCollection

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(), 
    new ParallelOptions 
    { 
     MaxDegreeOfParallelism = 4 // read this from config 
    }, 
    (id) => 
    { 
     // ... execute sproc ... 

     // Need to declare/assign this before the delegate so that we can dispose of it inside 
     Timer timer = null; 

     timer = new Timer(
      _ => 
      { 
       // Add the id back to the collection so it will be processed again 
       idsToProcess.Add(id); 

       // Cleanup the timer 
       timer.Dispose(); 
      }, 
      null, // no state, id wee need is "captured" in the anonymous delegate 
      2000, // probably should read this from config 
      Timeout.Infinite); 
    } 

最後,當進程正在關閉,你會打電話BlockingCollection::CompleteAdding從而使枚舉正在與停止阻止和完整,並行處理::的ForEach將退出。例如,如果這是Windows服務,您可以在OnStop中執行此操作。

// When ready to shutdown you just signal you're done adding 
idsToProcess.CompleteAdding(); 

更新

你提出你的意見很關注,你可能在任何給定的點來處理大量的ID,並擔心會有過多的開銷,每ID的計時器。我完全同意這一點。因此,在您同時處理ID的大名單的情況下,我會使用一個計時器,每ID使用另一個隊列來保存這是由一個單一的短間隔定時監控,而不是「沉睡」的ID改變。首先,您需要一個ConcurrentQueue在其中放置是睡着的ID:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

現在,我使用的是兩部分Tuple這裏用於說明目的,但你可能要創建一個更強類型結構爲它(或用using聲明至少它的別名)爲更好的可讀性。元組有ID,當它被放入隊列代表一個DateTime。

現在,你還需要設置,將監視此隊列中的計時器:

Timer wakeSleepingIdsTimer = new Timer(
    _ => 
    { 
     DateTime utcNow = DateTime.UtcNow; 

     // Pull all items from the sleeping queue that have been there for at least 2 seconds 
     foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) 
     { 
      // Add this id back to the processing queue 
      idsToProcess.Enqueue(id); 
     } 
    }, 
    null, // no state 
    Timeout.Infinite, // no due time 
    100 // wake up every 100ms, probably should read this from config 
); 

,那麼只需在改變Parallel::ForEach做到以下幾點,而不是設置一個計時器爲每個:

(id) => 
{ 
     // ... execute sproc ... 

     sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
} 
+0

好主意,但你不覺得這會造成一點資源問題嗎?我的意思是如果我有列表中的500個元素,我有點擔心大額正在運行的定時器... – 2011-06-14 07:27:34

+0

想過,但你並沒有給出明確的界限,所以我在等待一個響應,這是否還是沒有遇見你需要。您可以輕鬆地與另一隊列和單計時器,監視是由於項目的隊列和移動它們拖回的主要工作隊列解決這個問題。將添加詳細信息到我的答案。 – 2011-06-14 14:27:04

1

這是非常相似的,你說你已經有了你的問題的辦法,但TPL任務這樣做。一項任務只是將其自身添加回到要安排的事項清單中。

使用鎖定一個普通的名單上的是在這個例子相當難看,可能會希望有一個更好的收集保存的事情的清單來安排

// Fill the idsToSchedule 
for (int id = 0; id < 5; id++) 
{ 
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); 
} 

// LongRunning will tell TPL to create a new thread to run this on 
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

啓動了SchedulingLoop,其實際執行檢查是否已經跑了兩秒鐘

// Tuple of the last time an id was processed and the id of the thing to schedule 
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); 
static int currentlyProcessing = 0; 
const int ProcessingLimit = 3; 

// An event loop that performs the scheduling 
public static void SchedulingLoop() 
{ 
    while (true) 
    { 
     lock (idsToSchedule) 
     { 
      DateTime currentTime = DateTime.Now; 
      for (int index = idsToSchedule.Count - 1; index >= 0; index--) 
      { 
       var scheduleItem = idsToSchedule[index]; 
       var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; 

       // start it executing in a background task 
       if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) 
       { 
        Interlocked.Increment(ref currentlyProcessing); 

        Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); 

        // Schedule this task to be processed 
        Task.Factory.StartNew(() => 
         { 
          Console.WriteLine("Executing {0}", scheduleItem.Item2); 

          // simulate the time taken to call this procedure 
          Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); 

          lock (idsToSchedule) 
          { 
           idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); 
          } 

          Console.WriteLine("Done Executing {0}", scheduleItem.Item2); 
          Interlocked.Decrement(ref currentlyProcessing); 
         }); 

        // remove this from the list of things to schedule 
        idsToSchedule.RemoveAt(index); 
       } 
      } 
     } 

     Thread.Sleep(100); 
    } 
}