2016-03-06 63 views
2

我在C++中擁有以下代碼。該代碼是從C++ Concurrency In Action: Practical Multithreading在線程在C++中完成後向線程分配新任務

void do_work(unsigned id); 

void f() { 
    std::vector<std::thread> threads; 
    for(unsigned i = 0; i < 20; ++i) { 
     threads.push_back(std::thread(do_work, i)); 
    } 
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join)); 
} 

假設線程[0]已完成處理並返回一個值。我仍然有更多的文件要處理,現在想將這個新文件分配給完成的線程。我如何在C++中實現這種行爲?或者我必須銷燬線程,並在線程完成後創建一個新線程?但是,如何檢查這些線程是否已完成?

+0

查找。您將需要更多級別的間接工作負載傳遞。 –

+0

您需要有一種方法將工作傳遞給您的線程(通過隊列或其他方式)。這與你所展示的有很大不同。這對於proactor來說是一個很好的用例,就像boost :: asio一樣 – Chad

回答

2

「我怎樣才能在C++中實現這種行爲」的簡短答案就是編寫代碼來完成它。您已經確定自己的第一步是「如何檢查這些線程是否已完成」。

有幾種基本的方法。但是,它們都歸結爲同一件事情:不是讓每個線程簡單地消失,而是在每個線程終止之前通知父進程它已完成。

對於初學者來說,每個線程應該知道它是哪個線程。在你的例子中,所有線程都被放置在std::vector中,並且它們由向量的索引來標識。這不是唯一的方法。還有其他的方法來牧養所有的線索,但爲了答案的目的,這將會做到。

然後,每個線程都需要通過將線程索引號作爲線程參數傳遞來知道它是哪個索引。你的代碼已經做了。精彩。

現在,只需關閉循環的結尾:你只需要實例化一個std::mutex,具有std::condition_variable,這是保護std::queuestd::list。或者,也許是整數的一個std::set。您可以自由決定哪個容器最適合您。

然後,每個線程終止之前,它:

  • 鎖定互斥。

  • 將其線程索引放入容器中。

  • 表示條件變量。

  • 解鎖互斥鎖,然後立即返回,終止此線程。

然後,父線程,開始所有的線程:

  • 鎖定互斥

  • 檢查隊列/套/不管是空的。如果是,它會等待條件變量,直到它不是。

  • 從隊列/ set/whatever中刪除線程索引,並加入該線程。該線程剛剛結束。現在你知道哪個線程被終止了,並且可以做任何你想要的信息。

  • 完成處理或重新啓動線程後,它會再次檢查隊列是否爲空。

1

下面是Sam Varshavchik解釋的基本實現。

Live demo

爲什麼我添加了一個local_queue的原因是爲了確保我們的m_Mutex解鎖的時候了。如果將其刪除,調用push_task的線程可能會被阻塞。

析構函數調用stop(),它設置m_Runningfalse,通知線程關於它,並等待它完成處理所有剩餘的任務。

如果工人階級死亡,線程也死了,這很好。

我的例子只是創建3個線程和每個線程for (int i = 0; i < 5; i++) 5個任務,主要是爲了確保在ideone所有的輸出顯示,但我已經有10個線程和每個線程任務5000測試了它和它運行得很好。

do_work函數有兩行,如果您希望輸出流正確同步,您可以取消註釋。 該課程有多線程支持。

可以stop()並重新start()線程多次,你在線程池實現像

class Worker 
{ 
public: 
    Worker(bool start) : m_Running(start) { if (start) private_start(); } 
    Worker() : m_Running(false) { } 
    ~Worker() { stop(); } 

    template<typename... Args> 
    void push_task(Args&&... args) 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      m_Queue.push_back(std::bind(std::forward<Args>(args)...)); 
     } 

     m_Condition.notify_all(); 
    } 

    void start() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == true) return; 
      m_Running = true; 
     } 

     private_start(); 
    } 

    void stop() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == false) return; 
      m_Running = false; 
     } 

     m_Condition.notify_all(); 
     m_Thread.join(); 
    } 

private: 
    void private_start() 
    { 
     m_Thread = std::thread([this] 
     { 
      for (;;) 
      { 
       decltype(m_Queue) local_queue; 
       { 
        std::unique_lock<std::mutex> lk(m_Mutex); 
        m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; }); 

        if (!m_Running) 
        { 
         for (auto& func : m_Queue) 
          func(); 

         m_Queue.clear(); 
         return; 
        } 

        std::swap(m_Queue, local_queue); 
       } 

       for (auto& func : local_queue) 
        func(); 
      } 
     }); 
    } 

private: 
    std::condition_variable m_Condition; 
    std::list<std::function<void()>> m_Queue; 
    std::mutex m_Mutex; 
    std::thread m_Thread; 
    bool m_Running = false; 
}; 

void do_work(unsigned id) 
{ 
    //static std::mutex cout_mutex; 
    //std::lock_guard<std::mutex> lk(cout_mutex); 
    std::cout << id << std::endl; 
} 

int main() 
{ 
    { 
     Worker workers[3]; 
     int counter = 0; 

     for (auto& worker : workers) 
      worker.start(); 

     for (auto& worker : workers) 
     { 
      for (int i = 0; i < 5; i++) 
       worker.push_task(do_work, ++counter + i); 
     } 
    } 

    std::cout << "finish" << std::endl; 
    getchar(); 

    return 0; 
} 
相關問題