2015-10-07 924 views
1

嗯,我需要幫助。我正在嘗試做一些具體的事情,而我缺乏多線程技能正在讓我失望。C++主線程通知線程通知主線程

基本上我的主程序/線程需要管理一些必須運行多次的「通道」。由於這些運行是獨立的,每個通道都包含一個執行它們的線程。

因此,主線程必須等待所有通道(線程)完成其運行才能啓動下一個通道。 並且所有頻道都必須等待主線程可以運行的通知。

以下是我如何做到的 - 抱歉有點長!

#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 
#include <atomic> 

std::mutex    g_lockprint; 
std::mutex    g_lockbatch; 
std::condition_variable g_nextbatch; 
std::mutex    g_lockready; 
std::condition_variable g_ready; 

int global_id = 0; 
int nbChannels = 5; 
std::atomic<int> nbChannelsLeftToEnd; 

class Channel { 

private: 

    int _id; 
    std::thread _th; 
    std::atomic<bool> next_batch; 
    std::atomic<bool> stop_th; 

public: 

    Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {} 

    void go_for_next_batch() { next_batch = true; } 

    void start(int& start, int &end){ 
     _th = std::thread(&Channel::run, this, std::ref(start), std::ref(end)); 
    } 

    void stop(){ 
     stop_th = true; 
     _th.join(); 
    } 

    void run(int& start, int& end){ 
     while (!stop_th){ 
      { 
       std::unique_lock<std::mutex> locker(g_lockbatch); 
       g_nextbatch.wait(locker, [&](){return (next_batch==true); }); 
      } 

      // print a starting message 
      { 
       std::unique_lock<std::mutex> locker(g_lockprint); 
       std::cout << "[channel " << _id << "]\trunning in [" << start << "," << end << "]" << std::endl; 
      } 

      // simulate work 
      std::this_thread::sleep_for(std::chrono::seconds(1)); 

      // update the number of channels left to run 
      nbChannelsLeftToEnd--; 
      g_ready.notify_one(); 
      next_batch = false; 
     } 
    } 
}; 

int main() 
{ 
    int end = 100; 
    int batch = 10; 
    int startBatch = 0; 
    int endBatch = startBatch + batch; 

    // declare some channels (threads) 
    std::vector<Channel> channels(nbChannels); 

    // start the threads 
    for (auto& ch : channels) ch.start(startBatch, endBatch); 

    while (endBatch<=end){ 
     { 
      std::unique_lock<std::mutex> locker(g_lockprint); 
      std::cout << "[main]\trunning in [" << startBatch << "," << endBatch << "]" << std::endl; 
     } 
     nbChannelsLeftToEnd = nbChannels; 
     for (auto& ch : channels) ch.go_for_next_batch(); 
     g_nextbatch.notify_all(); 

     std::unique_lock<std::mutex> locker(g_lockready); 
     g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); }); 

     startBatch += batch; 
     endBatch += batch; 
    } 

    for (auto& ch : channels) ch.stop(); 

    return 0; 
} 

但有時程序塊,可能線程彼此等待,但我看不出爲什麼。 在任何情況下,加入線程(主要結束處的「stop」方法)會使我的程序無限期地運行,看不出爲什麼。

編輯:感謝您的意見和一些研究,我設法使用同步屏障獲得工作程序,以便主線程可以等待所有其他線程完成當前批處理,然後再告訴他們開始下一個線程。 我重用從別人這裏被引用Anthony Wiiliams's book阻隔碼 - 這裏的屏障

class barrier 
{ 
    unsigned const count; 
    std::atomic<unsigned> spaces; 
    std::atomic<unsigned> generation; 

public: 
    explicit barrier(unsigned count_) : 
     count(count_), spaces(count), generation(0) {} 

    void wait() 
    { 
     unsigned const my_generation = generation; 
     if (!--spaces) 
     { 
      spaces = count; 
      ++generation; 
     } 
     else 
     { 
      while (generation == my_generation) 
       std::this_thread::yield(); 
     } 
    } 
}; 

下面是使用屏障通道類新運行方法 - 注意附加測試的「stop_th 「國旗。當線程在最後一批之後並且在被連接之前被解鎖時,它不應該運行另一批次,因此該測試。

void run(int& start, int& end, barrier& b) 
{ 
    while (!stop_th){ 
     // wait for next batch notification - use the next_batch flag to avoid 
     // spurious wake-ups 
     { 
      std::unique_lock<std::mutex> locker(g_lockbatch); 
      g_nextbatch.wait(locker, [&](){return (next_batch==true); }); 
     } 

     if (stop_th) return; 

     // simulate work 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 

     // wait for everyone to meet 
     next_batch = false; 
     b.wait(); 
    } 
} 

最後這裏的主要

int main() 
{ 
    int end = 100; 
    int batch = 10; 
    int startBatch = 0; 
    int endBatch = startBatch + batch; 

    // declare a barrier where all threads will meet 
    barrier b(nbChannels+1); 

    // declare some channels (threads) 
    std::vector<Channel> channels(nbChannels); 

    // start the threads 
    for (auto& ch : channels) ch.start(startBatch, endBatch, b); 

    while (endBatch<=end){ 

     // notify the channels they can process one batch 
     for (auto& ch : channels) ch.go_for_next_batch(); 
     g_nextbatch.notify_all(); 

     // wait until all threads have finished their batch 
     b.wait(); 

     // prepare the next one 
     startBatch += batch; 
     endBatch += batch; 
    } 

    // all channels are blocked by the next_batch condition 
    // so notify a next batch and join them 
    for (auto& ch : channels) ch.stop(); 
    for (auto& ch : channels) ch.go_for_next_batch(); 
    g_nextbatch.notify_all(); 
    for (auto& ch : channels) ch.wait_until_stopped(); 

    return 0; 
} 

再次感謝您的所有意見/答案!

+0

您應該能夠調試器連接到現場處理(如:'GDB '在Linux上),並列出的當前狀態線程。我發現這通常能夠很好地說明造成問題的原因。我懷疑你應該嘗試堅持一個互斥鎖或者只按照嚴格的順序鎖定/解鎖。 –

回答

0

由於我在cpp.sh中修復了代碼,現在看起來完成了,所以我將我的評論更改爲了答案。

關於它們在呼叫停止時不存在。 請注意,它們可能仍然卡在等待下一個批次鎖定。 考慮添加一個調用來將它們從鎖中釋放,並讓它們檢查它們是否在鎖定步驟後停止。

將stop函數分成兩個函數,一個是更改布爾值,另一個是等待的地方。 可以調用兩個函數stop和wait_until_stopped

然後將下面的代碼添加到主函數中。

而不是

for (auto& ch : channels) ch.stop(); 

用途:

for (auto& ch : channels) ch.stop(); 

for (auto& ch : channels) ch.go_for_next_batch(); 

g_nextbatch.notify_all(); 

for (auto& ch : channels) ch.wait_until_stopped(); 
+0

+1!確實線程正在等待下一批,您的解決方案完美無缺!好吧,至少當我到達那一點時,程序經常會在此之前凍結,因此可能會留下一些問題... – pinch2k4

+0

如果程序中有凍結,則很可能是死鎖。 鎖之間的競賽。 附上一個調試器,看看誰在等待@Component 10建議的人。 – Jonathan