2009-01-19 65 views
0

我創建了一個自定義線程池實用程序,但似乎有一個問題,我無法找到。我的自定義線程池出了什麼問題?

using System; 
using System.Collections; 
using System.Collections.Generic; 
using System.Threading; 

namespace iWallpaper.S3Uploader 
{ 
public class QueueManager<T> 
{ 
    private readonly Queue queue = Queue.Synchronized(new Queue()); 
    private readonly AutoResetEvent res = new AutoResetEvent(true); 
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true); 
    private readonly Semaphore sem = new Semaphore(1, 4); 
    private readonly Thread thread; 
    private Action<T> DoWork; 
    private int Num_Of_Threads; 

    private QueueManager() 
    { 
     Num_Of_Threads = 0; 
     maxThread = 5; 
     thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"}; 
     thread.Start(); 

     // log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString())); 
    } 

    public int maxThread { get; set; } 

    public static FileUploadQueueManager<T> Instance 
    { 
     get { return Nested.instance; } 
    } 

    /// <summary> 
    /// Executes multythreaded operation under items 
    /// </summary> 
    /// <param name="list">List of items to proceed</param> 
    /// <param name="action">Action under item</param> 
    /// <param name="MaxThreads">Maximum threads</param> 
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads) 
    { 
     maxThread = MaxThreads; 
     DoWork = action; 
     foreach (T item in list) 
     { 
      Add(item); 
     } 
    } 
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action) 
    { 
     ExecuteNoThread(list, action, 0); 
    } 
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads) 
    { 
     foreach (T wallpaper in list) 
     { 
      action(wallpaper); 
     } 
    } 
    /// <summary> 
    /// Default 10 threads 
    /// </summary> 
    /// <param name="list"></param> 
    /// <param name="action"></param> 
    public void Execute(IEnumerable<T> list, Action<T> action) 
    { 
     Execute(list, action, 10); 
    } 

    private void Add(T item) 
    { 
     lock (queue) 
     { 
      queue.Enqueue(item); 
     } 
     res.Set(); 
    } 

    private void Worker() 
    { 
     while (true) 
     { 
      if (queue.Count == 0) 
      { 
       res.WaitOne(); 
      } 

      if (Num_Of_Threads < maxThread) 
      { 
       var t = new Thread(Proceed); 
       t.Start(); 
      } 
      else 
      { 
       res_thr.WaitOne(); 
      } 
     } 
    } 

    private void Proceed() 
    { 
     Interlocked.Increment(ref Num_Of_Threads); 
     if (queue.Count > 0) 
     { 
      var item = (T) queue.Dequeue(); 

      sem.WaitOne(); 
      ProceedItem(item); 
      sem.Release(); 
     } 
     res_thr.Set(); 
     Interlocked.Decrement(ref Num_Of_Threads); 
    } 

    private void ProceedItem(T activity) 
    { 
     if (DoWork != null) 
      DoWork(activity); 

     lock (Instance) 
     { 
      Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}", 
              thread.ManagedThreadId, DateTime.Now, queue.Count, activity, 
              Num_Of_Threads); 
     } 
    } 

    #region Nested type: Nested 

    protected class Nested 
    { 
     // Explicit static constructor to tell C# compiler 
     // not to mark type as beforefieldinit 
     internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>(); 
    } 

    #endregion 

} 

}

問題是在這裏:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}", 
             thread.ManagedThreadId, DateTime.Now, queue.Count, activity, 
             Num_Of_Threads); 

中始終有標題一個線程ID。程序似乎在一個線程中工作。

使用範例:

 var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6}; 
     QueueManager<int>.Instance.Execute(i_list, 
      i => 
      { 
       Console.WriteLine("Some action under element number {0}", i); 

      }, 5); 

P.S:這是相當混亂,但我仍然能夠正常工作。

+0

爲什麼你不只是在.NET中使用內置的線程池類? – GEOCHET 2009-01-19 13:34:26

回答

4

編寫健壯線程代碼是不平凡的。有許多線程池可供您參考,但也請注意並行擴展(可用作.NET Framework中的CTP或更高版本)包括許多其他線程構造,這些線程構造是開箱即用的(在TPL/CCR)。例如,Parallel.For/Parallel.ForEach,它涉及工作竊取以及有效處理可用內核。

有關預捲動線程池的示例,請參閱Jon Skeet的CustomThreadPoolhere

+0

不要忘記SmartThreadPool這是相當不錯的。 – user7116 2009-01-19 14:16:34

+0

感謝您的有用鏈接和導航^ _^ – AlfeG 2009-01-19 20:25:23

1

你應該使用內建的線程池。當你運行你的代碼時,我注意到你正在增加一堆線程,但是由於隊列的數量是< 1,你只要退出,這個過程會持續下去,直到隊列實際填充完畢,那麼你的下一個線程將處理所有事情。這是一個非常昂貴的過程。如果你有事要做,你只應該啓動線程。

+0

如果內置線程池不是靜態類,並且可以創建多個線程池對象,那麼內置線程池將非常棒。 – yfeldblum 2009-01-19 13:59:11

5

我翻看了你的代碼,這裏有幾個我看到的問題。

  1. 即使它是同步隊列,也會鎖定隊列對象。這是不必要的
  2. 您不一致地鎖定隊列對象。它應該被鎖定每個訪問或不鎖定,並根據同步的行爲。
  3. 繼續方法不是線程安全的。這兩行是問題

     
        if (queue.Count > 0) { 
         var item = (T)queue.Dequeue(); 
        ... 
        } 
    

    僅使用同步隊列可確保單個訪問是安全的。所以.Count和.Dequeue方法都不會混淆隊列的內部結構。然而想象兩個線程運行在同一時間這行代碼與計數的隊列1

    • 線程1的情況:如果(...) - >真正
    • 線程2:如果(...) - >真正
    • 線程1:離隊 - > sucess
    • 線程2:離隊 - >失敗,因爲隊列爲空
  4. Worker和Proceed之間存在可能導致死鎖的競爭條件。應該切換以下兩行代碼。

    代碼:

     
        res_thr.Set() 
        Interlocked.Decrement(ref Num_Of_Threads);

    第一行解鎖的輔助方法。如果運行速度足夠快,它將通過外觀返回,注意Num_Of_Threads maxThreads並返回res_thr.WaitOne()。如果沒有其他線程正在運行,那麼這將導致代碼中的死鎖。這是很容易打到最低線程數(例如1)。顛倒這兩行代碼可以解決這個問題。

  5. 的maxThread算財產似乎並沒有成爲SEM初始化對象只接受4最大併發條目超過4.有用。所有實際執行項目的代碼都必須經過這個信號量。因此,無論maxThread設置多高,您已經將併發項目的最大數量限制爲4。
2

我認爲你可以簡單的事情相當多。

下面是修改的形式(我沒有測試修改)我使用線程池:

唯一的同步。您需要的基元是一個Monitor,鎖定在線程池中。您不需要信號燈或重置事件。

internal class ThreadPool 
{ 
    private readonly Thread[] m_threads; 
    private readonly Queue<Action> m_queue; 
    private bool m_shutdown; 
    private object m_lockObj; 


    public ThreadPool(int numberOfThreads) 
    { 
     Util.Assume(numberOfThreads > 0, "Invalid thread count!"); 
     m_queue = new Queue<Action>(); 
     m_threads = new Thread[numberOfThreads]; 
     m_lockObj = new object(); 

     lock (m_lockObj) 
     { 
      for (int i = 0; i < numberOfWriteThreads; ++i) 
      { 
       m_threads[i] = new Thread(ThreadLoop); 
       m_threads[i].Start(); 
      } 
     } 

    } 

    public void Shutdown() 
    { 
     lock (m_lockObj) 
     { 
      m_shutdown = true; 
      Monitor.PulseAll(m_lockObj); 

      if (OnShuttingDown != null) 
      { 
       OnShuttingDown(); 
      } 
     } 
     foreach (var thread in m_threads) 
     { 
      thread.Join(); 
     } 
    } 
    public void Enqueue(Action a) 
    { 
     lock (m_lockObj) 
     { 
      m_queue.Enqueue(a); 
      Monitor.Pulse(m_lockObj); 
     } 
    } 

    private void ThreadLoop() 
    { 
     Monitor.Enter(m_lockObj); 

     while (!m_shutdown) 
     { 
      if (m_queue.Count == 0) 
      { 
       Monitor.Wait(m_lockObj); 
      } 
      else 
      { 
       var a = m_queue.Dequeue(); 
       Monitor.Pulse(m_lockObj); 
       Monitor.Exit(m_lockObj); 
       try 
       { 
        a(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null); 
       } 
       Monitor.Enter(m_lockObj); 
      } 
     } 

     Monitor.Exit(m_lockObj); 
    } 
}