3

我是一個堅信重新學習的學習者。以這種心態,我着手實現自定義線程池。我爲自己設定的目標如下:學習實現線程池 - 使用autoresetevent時發生信號丟失的事件丟失

  1. 爲了能夠在線程池中排隊工作項目。
  2. 能夠處理具有固定數量線程的工作項 - 全部同時創建。
  3. 通用工作線程函數應該只知道如何使用deque,而不應該處理其他函數/屬性,如 IsEmpty或Count。

我成功地實現了上述目標,但想驗證我在專家的方法上使用了stackoverflow。另外,想知道是否有更好的方法,或者多線程專家如何解決這個問題。以下段落提到了我面臨的挑戰以及我如何解決這個問題。

我創建的線程池在內部維護了一個工作項目隊列,所有工作線程都從中選取項目並處理它。每當一個新項目排隊時,它就會發出一個事件的信號,以便任何空閒的線程都可以把它撿起來並執行它。

我開始使用autoresetevent向隊列中的任何新工作項目發出等待線程的信號,但是我遇到了丟失信號事件的問題。它發生在多個項目排隊並且沒有空閒線程處理項目時發生。未處理的總項目與由於重疊設置(信令)事件而丟失的總信號事件相同。

爲了解決丟失信號事件的問題,我在autoresetevent的基礎上創建了一個包裝器,並用它來代替autoresetevent。它解決了這個問題。這裏是代碼爲同一上市:

public static class CustomThreadPool 
{ 
    static CustomThreadPool() 
    { 
     for (int i = 0; i < minThreads; i++) 
      _threads.Add(
       new Thread(ThreadFunc) { IsBackground = true } 
       ); 

     _threads.ForEach((t) => t.Start()); 
    } 

    public static void EnqueWork(Action action) 
    { 
     _concurrentQueue.Enqueue(action); 
     _enqueEvent.Set(); 
    } 

    private static void ThreadFunc() 
    { 
     Action action = null; 
     while (true) 
     { 
      _enqueEvent.WaitOne(); 
      _concurrentQueue.TryDequeue(out action); 
      action(); 
     } 
    } 

    private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>(); 
    private static List<Thread> _threads = new List<Thread>(); 
    private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent(); 
    private static object _syncObject = new object(); 
    private const int minThreads = 4; 
    private const int maxThreads = 10; 

    public static void Test() 
    { 
     CustomThreadPool.EnqueWork(() => { 

      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****First*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Second*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Third*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Fourth*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Fifth*****"); 
     }); 
    } 
} 

public class CountAutoResentEvent 
{ 
    public void Set() 
    { 
     _event.Set(); 
     lock (_sync) 
      _countOfSet++; 
    } 

    public void WaitOne() 
    { 
     _event.WaitOne(); 
     lock (_sync) 
     { 
      _countOfSet--; 
      if (_countOfSet > 0) 
       _event.Set(); 
     } 
    } 

    private AutoResetEvent _event = new AutoResetEvent(false); 
    private int _countOfSet = 0; 
    private object _sync = new object(); 
} 

現在,我有幾個問題:

  1. 是我的做法充分證明?
  2. 什麼同步機制最適合這個問題,爲什麼?
  3. 多線程專家如何處理這個問題?

謝謝。

+0

@Hans Passant:在我看來,這是談論一個不同的問題。這個問題有幾點。 – Tudor 2011-12-17 09:00:29

回答

1

從我所看到的我會說這是正確的。我喜歡你已經使用ConcurrentQueue,並沒有去實現你自己的同步隊列。這是一團糟,很可能不會像現有的那樣快。

我只想說明您的自定義「信號機制」實際上與信號量非常相似:允許多個線程進入臨界區的鎖。該功能已存在於Semaphore類中。

+0

@Hans Passant:他沒有說這是不正確的。他問它是否正確。爲什麼信號不正確?基本上他實施的是帶有計數器的事件。 – Tudor 2011-12-17 09:26:20