嗯,我需要幫助。我正在嘗試做一些具體的事情,而我缺乏多線程技能正在讓我失望。C++主線程通知線程通知主線程
基本上我的主程序/線程需要管理一些必須運行多次的「通道」。由於這些運行是獨立的,每個通道都包含一個執行它們的線程。
因此,主線程必須等待所有通道(線程)完成其運行才能啓動下一個通道。 並且所有頻道都必須等待主線程可以運行的通知。
以下是我如何做到的 - 抱歉有點長!
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>
std::mutex g_lockprint;
std::mutex g_lockbatch;
std::condition_variable g_nextbatch;
std::mutex g_lockready;
std::condition_variable g_ready;
int global_id = 0;
int nbChannels = 5;
std::atomic<int> nbChannelsLeftToEnd;
class Channel {
private:
int _id;
std::thread _th;
std::atomic<bool> next_batch;
std::atomic<bool> stop_th;
public:
Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {}
void go_for_next_batch() { next_batch = true; }
void start(int& start, int &end){
_th = std::thread(&Channel::run, this, std::ref(start), std::ref(end));
}
void stop(){
stop_th = true;
_th.join();
}
void run(int& start, int& end){
while (!stop_th){
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[channel " << _id << "]\trunning in [" << start << "," << end << "]" << std::endl;
}
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));
// update the number of channels left to run
nbChannelsLeftToEnd--;
g_ready.notify_one();
next_batch = false;
}
}
};
int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;
// declare some channels (threads)
std::vector<Channel> channels(nbChannels);
// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch);
while (endBatch<=end){
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[main]\trunning in [" << startBatch << "," << endBatch << "]" << std::endl;
}
nbChannelsLeftToEnd = nbChannels;
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
std::unique_lock<std::mutex> locker(g_lockready);
g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); });
startBatch += batch;
endBatch += batch;
}
for (auto& ch : channels) ch.stop();
return 0;
}
但有時程序塊,可能線程彼此等待,但我看不出爲什麼。 在任何情況下,加入線程(主要結束處的「stop」方法)會使我的程序無限期地運行,看不出爲什麼。
編輯:感謝您的意見和一些研究,我設法使用同步屏障獲得工作程序,以便主線程可以等待所有其他線程完成當前批處理,然後再告訴他們開始下一個線程。 我重用從別人這裏被引用Anthony Wiiliams's book阻隔碼 - 這裏的屏障:
class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_) :
count(count_), spaces(count), generation(0) {}
void wait()
{
unsigned const my_generation = generation;
if (!--spaces)
{
spaces = count;
++generation;
}
else
{
while (generation == my_generation)
std::this_thread::yield();
}
}
};
下面是使用屏障通道類新運行方法 - 注意附加測試的「stop_th 「國旗。當線程在最後一批之後並且在被連接之前被解鎖時,它不應該運行另一批次,因此該測試。
void run(int& start, int& end, barrier& b)
{
while (!stop_th){
// wait for next batch notification - use the next_batch flag to avoid
// spurious wake-ups
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}
if (stop_th) return;
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));
// wait for everyone to meet
next_batch = false;
b.wait();
}
}
最後這裏的主要:
int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;
// declare a barrier where all threads will meet
barrier b(nbChannels+1);
// declare some channels (threads)
std::vector<Channel> channels(nbChannels);
// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch, b);
while (endBatch<=end){
// notify the channels they can process one batch
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
// wait until all threads have finished their batch
b.wait();
// prepare the next one
startBatch += batch;
endBatch += batch;
}
// all channels are blocked by the next_batch condition
// so notify a next batch and join them
for (auto& ch : channels) ch.stop();
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
for (auto& ch : channels) ch.wait_until_stopped();
return 0;
}
再次感謝您的所有意見/答案!
您應該能夠調試器連接到現場處理(如:'GDB'在Linux上),並列出的當前狀態線程。我發現這通常能夠很好地說明造成問題的原因。我懷疑你應該嘗試堅持一個互斥鎖或者只按照嚴格的順序鎖定/解鎖。 –