2011-04-29 64 views
5

我有一個處理大塊信息的子例程。爲了利用整個CPU,它將工作劃分爲單獨的線程。所有線程完成後,結束。我讀過創建和銷燬線程使用大量開銷,所以我嘗試使用線程池,但實際上運行速度比創建自己的線程慢。如何在程序運行時創建自己的線程,然後繼續重用它們?我見過一些人說它不能完成,但是線程池是這樣做的,所以它一定是可能的,對吧?如何在.NET 3.5中重用線程

這裏是啓動新線程的部分代碼/使用線程池:

//initialization for threads 
Thread[] AltThread = null; 
if (NumThreads > 1) 
    AltThread = new Thread[pub.NumThreads - 1]; 

do 
{ 
    if (NumThreads > 1) 
    { //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads 
     int ThreadWidth = DataWidth/NumThreads; 
     if (UseThreadPool) //use threadpool threads 
     { 
      for (int i = 0; i < NumThreads - 1; i++) 
      { 
       ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread, 
        new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) }); 
      } 
      //get number of threads available after queue 
      System.Threading.Thread.Sleep(0); 
      int StartThreads, empty, EndThreads; 
      ThreadPool.GetAvailableThreads(out StartThreads, out empty); 
      ComputePartialData(ThisEngine, 0, ThreadWidth); 

      //wait for all threads to finish 
      do 
      { 
       ThreadPool.GetAvailableThreads(out EndThreads, out empty); 
       System.Threading.Thread.Sleep(1); 
      } while (StartThreads - EndThreads > 0); 
     } 
     else //create new threads each time (can we reuse these?) 
     { 
      for (int i = 0; i < NumThreads - 1; i++) 
      { 
       AltThread[i] = new Thread(ComputePartialDataOnThread); 
       AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) }); 
      } 
      ComputePartialData(ThisEngine, 0, ThreadWidth); 

      //wait for all threads to finish 
      foreach (Thread t in AltThread) 
       t.Join(1000); 
      foreach (Thread t in AltThread) 
       if (t.IsAlive) t.Abort(); 
     } 
    } 
} 

ComputePartialDataOnThread只是解包的信息,並調用ComputePartialData。將被處理的數據在線程中共享(它們不會嘗試讀取/寫入相同的位置)。 AltEngine []是每個線程的獨立計算引擎。

該操作使用線程池運行約10-20%。

+3

你可以發佈你的代碼,以便我們可以看到你在做什麼?有可能你在線程池中做了一些錯誤,導致它很慢。 – 2011-04-29 01:42:06

+0

也許它只在你的測試運行中很慢,也就是說你點擊了最初的線程數,所以它必須創建更多的線程來滿足你的需求。在運行任何測試之前,嘗試手動設置池中的最小線程數。 – 2011-04-29 01:50:55

+0

線程的數量意味着匹配處理器內核的數量。在這種情況下,它只有2. – HypnoToad 2011-04-29 02:07:29

回答

11

這聽起來像它可以通過一個多線程的生產者 - 消費者隊列來解決一個相當普遍的要求。線程保持「活動」,並在新工作添加到隊列時發出信號通知工作。工作由一個代表(在你的情況下ComputePartialDataOnThread)和傳遞給委託的數據是什麼排隊(在你的情況下,參數ComputePartialDataOnThread)。有用的特性是管理工作線程的實現和實際的算法是分開的。這裏是P-c排隊:

public class SuperQueue<T> : IDisposable where T : class 
{ 
    readonly object _locker = new object(); 
    readonly List<Thread> _workers; 
    readonly Queue<T> _taskQueue = new Queue<T>(); 
    readonly Action<T> _dequeueAction; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class. 
    /// </summary> 
    /// <param name="workerCount">The worker count.</param> 
    /// <param name="dequeueAction">The dequeue action.</param> 
    public SuperQueue(int workerCount, Action<T> dequeueAction) 
    { 
     _dequeueAction = dequeueAction; 
     _workers = new List<Thread>(workerCount); 

     // Create and start a separate thread for each worker 
     for (int i = 0; i < workerCount; i++) 
     { 
      Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i)}; 
      _workers.Add(t); 
      t.Start(); 
     } 
    } 

    /// <summary> 
    /// Enqueues the task. 
    /// </summary> 
    /// <param name="task">The task.</param> 
    public void EnqueueTask(T task) 
    { 
     lock (_locker) 
     { 
      _taskQueue.Enqueue(task); 
      Monitor.PulseAll(_locker); 
     } 
    } 

    /// <summary> 
    /// Consumes this instance. 
    /// </summary> 
    void Consume() 
    { 
     while (true) 
     { 
      T item; 
      lock (_locker) 
      { 
       while (_taskQueue.Count == 0) Monitor.Wait(_locker); 
       item = _taskQueue.Dequeue(); 
      } 
      if (item == null) return; 

      // run actual method 
      _dequeueAction(item); 
     } 
    } 

    /// <summary> 
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. 
    /// </summary> 
    public void Dispose() 
    { 
     // Enqueue one null task per worker to make each exit. 
     _workers.ForEach(thread => EnqueueTask(null)); 

     _workers.ForEach(thread => thread.Join()); 

    } 
} 

正如前面的海報說,有很多內置的結構(看TPL),它使用線程池,你可能想實現自己的隊列之前看。

+0

謝謝,這看起來是一個很好的做法。我會看看這個類是否比線程池快。 – HypnoToad 2011-04-29 21:18:24

+0

@DoctorZero:你成功使用它,速度更快嗎? – hofnarwillie 2013-08-21 12:14:47

+0

是的,在它的最終化身中,它有點不同,也更復雜,但是也是相同的想法。幾秒鐘不活動後線程進入低優先級模式,然後在不活動30秒後退出。這符合他們期望的工作類型和頻率。它的運行速度接近ThreadPool的速度,但不那麼緊張。此外,我發現我可以通過將數據分割成相鄰的行而不是大塊來獲得更好的CPU緩存,這也有所改變。 – HypnoToad 2013-11-16 13:39:49

2

所以通常的方式人會做到這一點是讓每個線程的入口點主要做一些類似(這只是一個算法,不C#代碼,抱歉)的東西:

  1. 檢查,看看是否有工作做
  2. 做的工作,如果發現信號

  • 等待對方,只要你有你的線程更多的工作,將其添加到工作隊列中埃森做,然後你的線程ce正在被重用。這與如何自行實現線程池非常相似(如果您在運行時可以做其他一些事情來幫助您,但這不是超級大事)。

  • +0

    如何在不佔用CPU的情況下等待信號? – HypnoToad 2011-04-29 02:09:47

    +0

    當您等待某個信號時,您只需將其設置爲阻止狀態,並在有人發出信號指示調度程序使您可以運行時。雖然阻塞你沒有時間在CPU上(但這不是最有效的東西,所以通常操作系統將只是忙着等一下)。被阻塞的線程不消耗處理器電源。 – 2011-04-29 02:35:37