2010-07-16 88 views
0

我想我可能需要重新考慮我的設計。我很難縮小一個導致我的電腦完全掛起的錯誤,有時會拋出VS 2010的HRESULT 0x8007000E。線程池/ WaitHandle資源泄漏/崩潰

我有一個控制檯應用程序(我將稍後轉換爲服務)基於數據庫隊列傳輸文件。

我限制了允許傳輸的線程。這是因爲我們連接的某些系統只能包含來自特定帳戶的一定數量的連接。

例如,系統A只能接受3個同時連接(這意味着3個獨立的線程)。這些線程中的每一個都有自己獨特的連接對象,所以我們不應該遇到任何同步問題,因爲它們沒有共享連接。

我們想循環處理來自這些系統的文件。因此,例如,我們將允許3個連接,每個連接最多可傳輸100個文件。這意味着,要從系統A移動1000個文件,我們每個週期只能處理300個文件,因爲每個文件允許3個線程,每個文件有100個文件。因此,在這個轉移的整個生命週期中,我們將有10個線程。我們一次只能運行3個。所以,會有3個週期,最後一個週期只會使用1個線程來傳輸最後的100個文件。 (3個線程×100個=文件每循環300名的文件)

現行的結構通過例子是:

  1. 甲System.Threading.Timer檢查每5秒的東西通過調用GetScheduledTask做隊列()
  2. 如果沒有什麼,GetScheduledTask()簡單地什麼也不做
  3. 如果有工作,創建一個線程池線程來處理工作[工作線程A]
  4. 工作線程A看到,有1000個文件傳輸
  5. 工作線程A看到,它只能有3個線程中運行的系統中,它從
  6. 工作線程A獲取文件啓動三個新的工作線程[B,C,d]和轉讓
  7. 工作線程用於等待B,C,d [WaitHandle.WaitAll(transfersArray)]
  8. 工作線程A看到有隊列更文件(應該是700現在)
  9. 工作線程A創建一個新的陣列上等待[transfersArray = new TransferArray[3]這是最大的系統A,但可能會在系統上有所不同
  10. 工作線程A啓動三個新工作線程[B,C,D]並等待它們[WaitHandle.WaitAll(transfersArray)]
  11. 該過程重複,直到沒有更多的文件要移動。
  12. ,它是做

工作線程A信號,我使用的ManualResetEvent來處理信號。

我的問題是:

  1. 是否有任何明顯的情況下,這將導致該我遇到資源泄漏或問題?
  2. 我應該循環數組通每一個WaitHandle.WaitAll(array)後,並呼籲array[index].Dispose()?
  3. 任務管理器下句柄計數此過程中慢慢爬行
  4. 我打電話工作者線程A的從的System.Threading初始創建。計時器。這會有什麼問題嗎?該定時器的代碼是:

(用於調度一些類代碼)

private ManualResetEvent _ResetEvent; 

private void Start() 
{ 
    _IsAlive = true; 
    ManualResetEvent transferResetEvent = new ManualResetEvent(false); 
    //Set the scheduler timer to 5 second intervals 
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000); 
} 

private void ScheduledTasks_Tick(object state) 
{ 
    ManualResetEvent resetEvent = null; 
    try 
    { 
     resetEvent = (ManualResetEvent)state; 
     //Block timer until GetScheduledTasks() finishes 
     _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite); 
     GetScheduledTasks(); 
    } 
    finally 
    { 
     _ScheduledTasks.Change(5000, 5000); 
     Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff")); 
     resetEvent.Set(); 
    } 
} 


private void GetScheduledTask() 
{ 
    try 
    { 
     //Check to see if the database connection is still up 
     if (!_IsAlive) 
     { 
      //Handle 
      _ConnectionLostNotification = true; 
      return; 
     } 

     //Get scheduled records from the database 
     ISchedulerTask task = null; 

     using (DataTable dt = FastSql.ExecuteDataTable(
       _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure, 
       new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class 
     { 
      if (dt != null) 
      { 
       if (dt.Rows.Count == 1) 
       { //Only 1 row is allowed 
        DataRow dr = dt.Rows[0]; 

        //Get task information 
        TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString()); 
        task = ScheduledTaskFactory.CreateScheduledTask(taskType); 

        task.Description = dr["Description"].ToString(); 
        task.IsEnabled = (bool)dr["IsEnabled"]; 
        task.IsProcessing = (bool)dr["IsProcessing"]; 
        task.IsManualLaunch = (bool)dr["IsManualLaunch"]; 
        task.ProcessMachineName = dr["ProcessMachineName"].ToString(); 
        task.NextRun = (DateTime)dr["NextRun"]; 
        task.PostProcessNotification = (bool)dr["NotifyPostProcess"]; 
        task.PreProcessNotification = (bool)dr["NotifyPreProcess"]; 
        task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString()); 
        task.SleepMinutes = (int)dr["SleepMinutes"]; 
        task.ScheduleId = (int)dr["ScheduleId"]; 
        task.CurrentRuns = (int)dr["CurrentRuns"]; 
        task.TotalRuns = (int)dr["TotalRuns"]; 

        SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task); 
        //Queue up task to worker thread and start 
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);  
       } 
      } 
     } 

    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
} 

private void ThreadProc(object taskObject) 
{ 
    SchedulerTask task = (SchedulerTask)taskObject; 
    ScheduledTaskEngine engine = null; 
    try 
    { 
     engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString); 
     engine.StartTask(task.Task);  
    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
    finally 
    { 
     task.TaskResetEvent.Set(); 
     task.TaskResetEvent.Dispose(); 
    } 
} 
+0

好像這是一個編碼錯誤,與聲明重置事件數組有關。我是做 'ManualResetEvent的[]事件=新ManualResetEvents [統計]' 而不是 '的WaitHandle [] =事件的WaitHandle新[計]' – 2010-07-19 18:16:01

回答

0

事實證明,這個奇怪問題的來源與體系結構無關,而是因爲將解決方案從3.5轉換爲4.0。我重新創建瞭解決方案,不進行代碼更改,並且問題再也沒有發生過。

2

0x8007000E是存儲器外的錯誤。這和處理計數似乎指向資源泄漏。確保你正在處理實現IDisposable的每個對象。這包括您正在使用的ManualResetEvent的陣列。

如果您有時間,您可能還想轉換爲使用.NET 4.0 Task類;它被設計來處理像這樣更復雜的場景。通過定義子對象,可以減少總體線程數(線程非常昂貴,不僅因爲調度,還因爲它們的堆棧空間)。

0

我認爲你應該重新考慮你的架構。您只能同時連接3個事實幾乎要求您使用1個線程來生成文件列表和3個線程來處理它們。您的生產者線程會將所有文件插入到隊列中,並且3個使用者線程將出隊並繼續處理,因爲物品到達隊列中。阻塞隊列可以顯着簡化代碼。如果您使用.NET 4.0,那麼您可以利用BlockingCollection類。

public class Example 
{ 
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>(); 

    public void Start() 
    { 
     var threads = new Thread[] 
      { 
       new Thread(Producer), 
       new Thread(Consumer), 
       new Thread(Consumer), 
       new Thread(Consumer) 
      }; 
     foreach (Thread thread in threads) 
     { 
      thread.Start(); 
     } 
    } 

    private void Producer() 
    { 
     while (true) 
     { 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      ScheduledTask task = GetScheduledTask(); 
      if (task != null) 
      { 
       foreach (string file in task.Files) 
       { 
        m_Queue.Add(task); 
       } 
      } 
     } 
    } 

    private void Consumer() 
    { 
     // Make a connection to the resource that is assigned to this thread only. 
     while (true) 
     { 
      string file = m_Queue.Take(); 
      // Process the file. 
     } 
    } 
} 

我在上面的例子中絕對過分簡化了一些東西,但我希望你能得到一般的想法。請注意,這是非常簡單的,因爲線程同步的方式並不多(大多數將嵌入到阻塞隊列中),當然也不會使用對象。很明顯,你將不得不添加正確的機制來優雅地關閉線程,但這應該相當容易。

1

我正在尋找類似問題的解答(處理計數隨時間而增加)。

我看了一下您的應用程序架構和想建議你的東西,可以幫助你:

你聽說過IOCP(輸入輸出完成端口)。

我不確定使用C#實現這一點,但在C/C++中它是一塊蛋糕。 通過使用它可以創建一個唯一的線程池(該池中的線程數一般定義爲2 x PC或服務器中處理器或處理器核的數量) 將此池與IOCP句柄關聯並將池做這項工作。 查看這些功能的幫助: CreateIoCompletionPort(); PostQueuedCompletionStatus(); GetQueuedCompletionStatus();

一般情況下創建和退出線程可能非常耗時並導致性能損失和內存碎片。 有關於IOCP在MSDN和谷歌的成千上萬的文獻。