2014-09-22 90 views
1

我正在開發一個使用線程池的應用程序,向它提交任務並同步它們。主線程必須等待,直到所有提交的任務從單個循環迭代完成,然後提交另一堆任務(因爲來自下一次迭代的任務對相同數據進行操作並且它們將相互依賴)。同步任務

我的問題是,最好的方法是什麼?

到目前爲止,我想到的是每個線程在完成任務後遞增一個原子無符號整數。當整數等於提交任務的數量時,主線程繼續工作並提交另一輪任務。

這是我的第一個多線程應用程序。 這是處理這個問題的最佳和明智的方式。

我使用的是從一個很好的書「C++併發操作中複製一個線程池類:安東尼·威廉斯

這裏是類:

class thread_pool 
{ 
    std::atomic_bool done; 
    thread_safe_queue<std::function<void()> > work_queue; 
    std::vector<std::thread> threads; 
    join_threads joiner; 

    void worker_thread() 
    { 
     while(!done) 
     { 
      std::function<void()> task; 
      if(work_queue.try_pop(task)) 
      { 
       task(); 
      } 
      else 
      { 
       std::this_thread::yield(); 
      } 
     } 
    } 
public: 
    thread_pool(): 
     done(false),joiner(threads) 
    { 
     unsigned const thread_count=std::thread::hardware_concurrency(); 
     try 
     { 
      for(unsigned i=0;i<thread_count;++i) 
      { 
       threads.push_back(
        std::thread(&thread_pool::worker_thread,this)); 
      } 
     } 
     catch(...) 
     { 
      done=true; 
      throw; 
     } 
    } 

    ~thread_pool() 
    { 
     done=true; 
    } 

    template<typename FunctionType> 
    void submit(FunctionType f) 
    { 
     work_queue.push(std::function<void()>(f)); 
    } 
}; 

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() 
    {} 

    void push(T new_value) 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     data_queue.push(std::move(new_value)); 
     data_cond.notify_one(); 
    } 

    void wait_and_pop(T& value) 
    { 
     std::unique_lock<std::mutex> lk(mut); 
     data_cond.wait(lk, [this]{return !data_queue.empty(); }); 
     value = std::move(data_queue.front()); 
     data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
     std::unique_lock<std::mutex> lk(mut); 
     data_cond.wait(lk, [this]{return !data_queue.empty(); }); 
     std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front()))); 
     data_queue.pop(); 
     return res; 
    } 

    bool try_pop(T& value) 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     if (data_queue.empty()) 
      return false; 
     value = std::move(data_queue.front()); 
     data_queue.pop(); 
    } 

    std::shared_ptr<T> try_pop() 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     if (data_queue.empty()) 
      return std::shared_ptr<T>(); 
     std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front()))); 
     data_queue.pop(); 
     return res; 
    } 

    bool empty() const 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     return data_queue.empty(); 
    } 
}; 

的main()函數:

std::condition_variable waitForThreads; 
std::mutex mut; 

std::atomic<unsigned> doneCount = 0; 

unsigned threadCount = 4; // sample concurrent thread count that I use for testing 

void synchronizeWork() 
    { 
    doneCount++; 
    if (doneCount.load() == threadCount) 
    { 
     doneCount = 0; 
     std::lock_guard<std::mutex> lock(mut); 
     waitForThreads.notify_one(); 
    } 
    } 

    void Task_A() 
    { 
    std::cout << "Task A, thread id: " << std::this_thread::get_id() << std::endl; 
    std::this_thread::sleep_for(std::chrono::milliseconds(3000)); 
    synchronizeWork(); 
    } 

int main() 
{ 
    unsigned const thread_count = std::thread::hardware_concurrency(); 
    thread_pool threadPool; 

    for (int i = 0; i < 1000; ++i) 
    { 
     for (unsigned j = 0; j < thread_count; j++) 
      threadPool.submit(Task_A); 

// Below is my way of synchronizing the tasks 

     { 
      std::unique_lock<std::mutex> lock(mut); 
      waitForThreads.wait(lock); 
     } 

    } 

回答

0

我不熟悉您正在使用的線程池類。

w^ithout使用這樣的類,通常的方式做到這一點是這樣的:

std::cout << "Spawning 3 threads...\n"; 
    std::thread t1 (pause_thread,1); 
    std::thread t2 (pause_thread,2); 
    std::thread t3 (pause_thread,3); 
    std::cout << "Done spawning threads. Now waiting for them to join:\n"; 
    t1.join(); 
    t2.join(); 
    t3.join(); 
    std::cout << "All threads joined!\n"; 

我會想象任何像樣的線程池類將允許你在同樣的事情,甚至更簡單,給你一個實現方法具阻止直到所有線程完成。我建議你仔細檢查文檔。

+2

事情是,我不想在每次需要完成任務時創建一個新線程,因爲我認爲這將是無效的(因爲需要執行相同類型的任務數千次)。 而是,一個派生線程檢查循環中是否有任何隊列中的待處理任務,然後執行該工作。 – rafaLiusz 2014-09-22 18:32:58

+0

如果創建線程花費的時間很長,那麼任務的大小太小。您應該將足夠的任務合併爲一個,以便線程開銷不重要。 – ravenspoint 2014-09-22 18:50:35

+1

完成我的任務的時間是非常重要的,因爲他們做分子動力學計算。但是我仍然不確定是否爲每次迭代創建一個新線程是解決這個問題最優雅的方式,這看起來太簡單了。 這意味着我根本不需要線程池。 – rafaLiusz 2014-09-22 19:15:23