2014-12-28 56 views
1

希望你們都有愉快的假期。可變線程的C++線程池:分配任務給threaads時出現奇怪的死鎖

這個問題涉及到我剛纔的問題:std::condition_variable - Wait for several threads to notify observer

我想基於以下我自己的可變線程執行來實現線程池:

class MutableThread 
{ 
private: 
    std::thread m_Thread; 
    std::function<void()> m_Function; 
    bool m_bRun; 
    std::mutex m_LockMutex; 
    std::mutex m_WaitMutex; 
    std::condition_variable m_CV; 
    IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr; 

private: 
    void Execute() 
    { 
     while (m_bRun) 
     { 
      { 
       std::unique_lock<std::mutex> wait(m_WaitMutex); 
       m_CV.wait(wait); 
      }    

      std::lock_guard<std::mutex> lock(m_LockMutex); 
      if (m_bRun && m_Function) 
      { 
       m_Function(); 
       m_Function = std::function<void()>(); 

       if (m_Observer != nullptr) 
       { 
        m_Observer->Signal(this); 
       } 
      } 
     } 
    } 

public: 
    HDEBUGNAME(TEXT("MutableThread")); 

    MutableThread(const MutableThread& thread) = delete; 

    MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer) 
    { 
     m_Observer = _Observer; 
     m_bRun = true; 
     m_Thread = std::thread(&MutableThread::Execute, this); 
    } 

    MutableThread() 
    { 
     m_Observer = nullptr; 
     m_bRun = true; 
     m_Thread = std::thread(&MutableThread::Execute, this); 
    }  

    ~MutableThread() 
    { 
     m_bRun = false; 

     m_CV.notify_one(); 

     try 
     { 
      if (m_Thread.joinable()) 
       m_Thread.join(); 
     } 
     catch (std::system_error& ex) 
     { 
      HWARNINGD(TEXT("%s"), ex.what()); 
     }       
    } 

    inline bool Start(const std::function<void()>& f) 
    { 
     std::lock_guard<std::mutex> lock(m_LockMutex); 

     if (m_Function != nullptr) 
      return false; 

     m_Function = f; 

     m_CV.notify_one(); 

     return true; 
    } 

的IAsyncTemplateObserver只是從我的IAsyncObserver類派生髮布在前面的問題,並添加了虛擬功能:

template <typename T> 
class IAsyncTemplateObserver : public IAsyncObserver 
{ 
public: 
    virtual void Signal(T* _Obj) = 0; 
}; 

我想要做的是,信號線程池的功能執行結束和一個新的任務被分配給了可變螺紋:

class MutableThread; 

struct Task 
{ 
    std::function<void()> m_Function; 
    uint32_t m_uPriority; 

    Task(const std::function<void()>& _Function, uint32_t _uPriority) 
    { 
     m_Function = _Function; 
     m_uPriority = _uPriority; 
    } 
}; 

inline bool operator<(const Task& lhs, const Task& rhs) 
{ 
    return lhs.m_uPriority < rhs.m_uPriority; 
} 

class ThreadPool : public IAsyncTemplateObserver<MutableThread> 
{ 
private: 
    std::list<MutableThread* > m_FreeThreads; 
    std::list<MutableThread* > m_UsedThreads; 

    std::set<Task> m_Tasks; 

    std::mutex m_LockMutex;  
public: 

    ThreadPool() 
    { 
     //Grow(std::thread::hardware_concurrency() - 1); 
    } 

    ThreadPool(size_t n) 
    { 
     Grow(n); 
    } 

    ~ThreadPool() 
    { 
     //std::lock_guard<std::mutex> lock(m_Mutex); 
     for (MutableThread* pUsed : m_UsedThreads) 
     { 
      HSAFE_DELETE(pUsed); 
     } 

     for (MutableThread* pFree : m_FreeThreads) 
     { 
      HSAFE_DELETE(pFree); 
     } 
    } 

    inline void Grow(size_t n) 
    { 
     std::lock_guard<std::mutex> lock(m_LockMutex); 

     for (size_t i = 0; i < n; i++) 
     { 
      m_FreeThreads.push_back(new MutableThread(this)); 
     } 
    } 

    inline void AddTask(const Task& _Task) 
    { 
     { 
      std::lock_guard<std::mutex> lock(m_LockMutex); 
      m_Tasks.insert(_Task); 
     } 

     AssignThreads(); 
    } 

    virtual void Signal(MutableThread* _pThread) 
    { 
     { 
      std::lock_guard<std::mutex> lock(m_LockMutex); 
      m_UsedThreads.remove(_pThread); 
      m_FreeThreads.push_back(_pThread); 
     } 

     AssignThreads(); 

     NotifyOne(); 
    } 

    inline void WaitForAllThreads() 
    { 
     bool bWait = true; 
     do 
     { 
      { 
       //check if we have to wait 
       std::lock_guard<std::mutex> lock(m_LockMutex); 
       bWait = !m_UsedThreads.empty() || !m_Tasks.empty(); 
      } 

      if (bWait) 
      {     
       std::unique_lock<std::mutex> wait(m_ObserverMutex); 
       m_ObserverCV.wait(wait); 
      } 

     } while (bWait); 
    } 

private: 

    inline void AssignThreads() 
    { 
     std::lock_guard<std::mutex> lock(m_LockMutex); 

     if (m_FreeThreads.empty() || m_Tasks.empty()) 
      return; 

     //Get free thread 
     MutableThread* pThread = m_FreeThreads.back(); 
     m_FreeThreads.pop_back(); 

     //park thread in used list 
     m_UsedThreads.push_back(pThread); 

     //get task with highest priority 
     std::set<Task>::iterator it = m_Tasks.end(); 
     --it; //last entry has highest priority 

     //start the task 
     pThread->Start(it->m_Function); 

     //remove the task from the list 
     m_Tasks.erase(it);   
    } 

的AddTask功能由同一個線程調用多次,但是當一個可變的線程信號的線程池(通過m_Observer - > Signal(this))應用程序在AssignThreads()函數的lock_guard上凍結。現在,奇怪的事情與正常的死鎖不同,Visual Studio中的所有callstack視圖都是空的,只要我嘗試用lock_guard越過行。

任何人都可以解釋這種行爲嗎?是否有任何主要的設計缺陷或只是一個簡單的混合?

感謝您的幫助!

問候, 費邊

編輯:我添加了重現問題最小的Visual Studio解決方案:ThreadPoolTest.zip

+0

我沒有你的問題的答案,但如果你正在一個Windows系統上工作,你應該嘗試分析調用std :: async調用沒有線程池。根據我的經驗,它的速度要快很多,因爲windows似乎非常擅長優化線程。 –

+0

這似乎是一個好主意,我會檢查出來。 – Fabian

回答

0

多虧了朋友,我能夠通過移動電話m_Observer來解決這個問題 - >在MutableThread :: Execute()函數的鎖定範圍之外發出信號(this)。其次,我在AssignThreads()函數中刪除了lock_guard,並將其調用移入了Signal()/ AddTask函數中的lock_guard範圍。沒有真正相關,但仍然是一個缺陷:所有的condition_variables.wait()調用現在都在while(m_bNotified == false)循環中。