線程是執行的序列。它們的行爲大致類似於線性C++程序,嵌入在內存模型中,可以讓它們進行通信並注意由其他執行線程引起的狀態更改。
對線程的回調無法在沒有來自線程的合作的情況下接管一系列執行。您想要通知的線程必須明確檢查消息是否已到達並處理它。
有兩種常見的方法來處理對消息的響應。
第一種是std::future
之類的方法。其中,調用者獲取某種類型的令牌,該令牌代表將來可能或將要產生的答案。
第二個是再次使用消息。您向B發送消息,要求回覆。 B向A發送一條包含響應的消息。與B接收消息的方式相同,A收到消息。該消息可能包含某種「返回目標」,以幫助將其鏈接到原始消息。
在基於消息的系統中,通常有一個「事件循環」。而不是一個大型的線性程序,你有一個線程可以反覆返回到「事件循環」。在那裏它檢查消息的隊列,如果沒有消息等待一些消息。
任務有這樣的制度下被分解成一口大小的塊,這樣你檢查事件循環的頻率足以響應。
執行此操作的一種方法是使用協程,這是一種沒有擁有自己的執行程序的執行狀態(如擁有兩者的線程)。協程定期放棄優先級並「稍後保存它們的狀態」。
未來的解決方案通常是最簡單的,但它依賴於A週期性地檢查響應。
首先,threaded_queue<T>
,它可以讓任何數量的生產者和消費者傳遞的東西放進一個隊列,吃起來關前:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back(T t) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); });
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
現在,我們要的任務傳遞到這樣的隊列,並有通過任務的人得到結果。下面是一個使用上述的與包裝的任務:
template<class...Args>
struct threaded_task_queue {
threaded_task_queue() = default;
threaded_task_queue(threaded_task_queue&&) = delete;
threaded_task_queue& operator=(threaded_task_queue&&) = delete;
~threaded_task_queue() = default;
template<class F, class R=std::result_of_t<F&(Args...)>>
std::future<R> queue_task(F task) {
std::packaged_task<R(Args...)> p(std::move(task));
auto r = p.get_future();
tasks.push_back(std::move(p));
return r;
}
void terminate() {
tasks.terminate();
}
std::function<void(Args...)> pop_task() {
auto task = tasks.pop_front();
if (!task) return {};
auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
return [task_ptr](Args...args){
(*task_ptr)(std::forward<Args>(args)...);
};
}
private:
threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
如果我這樣做是正確的,它的工作原理是這樣的:
A發送隊列任務的拉姆達的形式到B。這個lambda需要一些固定的參數集合(由B提供),並返回一些值。
B彈出隊列,並獲取採用參數的std::function
。它調用它;它在B的上下文中返回void
。
A在排隊任務時被給予future<R>
。它可以查詢它是否完成。
你會注意到A不能被「通知」已完成。這需要不同的解決方案。但是如果A最終達到不等待B的結果就無法進步的程度,那麼這個系統就可以工作。另一方面,如果A積累了大量這樣的消息並且有時需要等待來自許多這樣的B的輸入,直到它們中的任何一個返回數據(或者用戶做了某事),則需要比一個std::future<R>
。一般模式 - 具有代表未來計算的令牌 - 是可靠的。但是你需要擴展它以適應未來計算和消息循環等多種來源。
未經測試的代碼。
一爲threaded_task_queue
方法,當你發送的消息是:
template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
threaded_task_queue< std::function<R(Args...)> >
{
std::future<R> queue_message(Args...args) {
return this->queue_task(
[tup = std::make_tuple(std::forward<Args>(args)...)]
(std::function<R(Args...)> f) mutable
{
return std::apply(f, std::move(tup));
}
);
}
bool consume_message(std::function<R(Args...)> f)
{
auto task = pop_task();
if (!task) return false;
task(std::move(f));
return true;
}
};
其中對提供商來說,你提供Args...
,並在消費者一方,你消耗Args...
和返回R
,並在供應商方面,你一旦消費者完成,有一個future<R>
來獲得結果。
這可能比我寫的原始threaded_task_queue
更自然。
std::apply
是C++ 17,但是在C++ 11和C++ 14中有野性的實現。
你可能不希望線程互相訪問。處理這種事情的通常方法是使用消息隊列。一個線程放入消息,另一個線程收到消息。隊列可以處理同步細節。現在,ZeroMQ似乎成爲這種類型的流行圖書館。 – vincent
@Vincent我已經在'B'類中使用了一個隊列。 'B'擁有隊列,'A'通過'add_message(MessageType msg)'方法將消息添加到隊列中。我丟失的地方是'A'希望得到答覆,那麼它怎麼知道什麼時候有...? –
你需要一個互斥或其他同步機制來防止你的線程互相破壞內存。您不能只從同一個線程讀取/寫入相同的內存(變量),而無需同步。 –