2017-02-28 100 views
1

我一直在圍繞C++中的回調函數進行封裝。我試圖實現以下內容:C++ lambda回調觸發事件

我有兩個對象,每個都有自己的線程。一個對象A具有指向第二個對象B的指針。見例如:

class A 
{ 
    public: 
    // ... 
    private: 
    std::unique_ptr<B> b; 
}; 

class B 
{ 
    public: 
    void add_message(MessageType msg); 
    // ... 
}; 

我想實現是有目的A使用指針添加一條消息,B,然後繼續做其他的東西,但有當B有一個是被觸發的回調或處理程序或東西回覆該消息。 B對消息做了一些處理,並可能將其傳遞給其他對象以在其自己的線程上進行處理,但最終會得到答覆。所以,我怎麼能知道什麼時候B有我的消息的答覆,例如:

// In class A 
MessageType m(); 
b->add_message(m) 
// A's thread continues doing other stuff 
... 
// some notification that b has a reply? 

我知道我可能要使用std ::功能,我想用一個回調,但我不能讓我通過查看大量示例來了解如何完成此操作。任何幫助表示讚賞,並注意我已經看了很多例子,但不能將其與我正在嘗試實現或不理解的東西聯繫起來......

+1

你可能不希望線程互相訪問。處理這種事情的通常方法是使用消息隊列。一個線程放入消息,另一個線程收到消息。隊列可以處理同步細節。現在,ZeroMQ似乎成爲這種類型的流行圖書館。 – vincent

+0

@Vincent我已經在'B'類中使用了一個隊列。 'B'擁有隊列,'A'通過'add_message(MessageType msg)'方法將消息添加到隊列中。我丟失的地方是'A'希望得到答覆,那麼它怎麼知道什麼時候有...? –

+0

你需要一個互斥或其他同步機制來防止你的線程互相破壞內存。您不能只從同一個線程讀取/寫入相同的內存(變量),而無需同步。 –

回答

3

線程是執行的序列。它們的行爲大致類似於線性C++程序,嵌入在內存模型中,可以讓它們進行通信並注意由其他執行線程引起的狀態更改。

對線程的回調無法在沒有來自線程的合作的情況下接管一系列執行。您想要通知的線程必須明確檢查消息是否已到達並處理它。


有兩種常見的方法來處理對消息的響應。

第一種是std::future之類的方法。其中,調用者獲取某種類型的令牌,該令牌代表將來可能或將要產生的答案。

第二個是再次使用消息。您向B發送消息,要求回覆。 B向A發送一條包含響應的消息。與B接收消息的方式相同,A收到消息。該消息可能包含某種「返回目標」,以幫助將其鏈接到原始消息。

在基於消息的系統中,通常有一個「事件循環」。而不是一個大型的線性程序,你有一個線程可以反覆返回到「事件循環」。在那裏它檢查消息的隊列,如果沒有消息等待一些消息。

任務有這樣的制度下被分解成一口大小的塊,這樣你檢查事件循環的頻率足以響應。

執行此操作的一種方法是使用協程,這是一種沒有擁有自己的執行程序的執行狀態(如擁有兩者的線程)。協程定期放棄優先級並「稍後保存它們的狀態」。


未來的解決方案通常是最簡單的,但它依賴於A週期性地檢查響應。

首先,threaded_queue<T>,它可以讓任何數量的生產者和消費者傳遞的東西放進一個隊列,吃起來關前:

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 

現在,我們要的任務傳遞到這樣的隊列,並有通過任務的人得到結果。下面是一個使用上述的與包裝的任務:

template<class...Args> 
struct threaded_task_queue { 
    threaded_task_queue() = default; 
    threaded_task_queue(threaded_task_queue&&) = delete; 
    threaded_task_queue& operator=(threaded_task_queue&&) = delete; 
    ~threaded_task_queue() = default; 
    template<class F, class R=std::result_of_t<F&(Args...)>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R(Args...)> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::function<void(Args...)> pop_task() { 
    auto task = tasks.pop_front(); 
    if (!task) return {}; 
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task)); 
    return [task_ptr](Args...args){ 
     (*task_ptr)(std::forward<Args>(args)...); 
    }; 
    } 
private: 
    threaded_queue<std::packaged_task<void(Args...)>> tasks; 
}; 

如果我這樣做是正確的,它的工作原理是這樣的:

  • A發送隊列任務的拉姆達的形式到B。這個lambda需要一些固定的參數集合(由B提供),並返回一些值。

  • B彈出隊列,並獲取採用參數的std::function。它調用它;它在B的上下文中返回void

  • A在排隊任務時被給予future<R>。它可以查詢它是否完成。

你會注意到A不能被「通知」已完成。這需要不同的解決方案。但是如果A最終達到不等待B的結果就無法進步的程度,那麼這個系統就可以工作。另一方面,如果A積累了大量這樣的消息並且有時需要等待來自許多這樣的B的輸入,直到它們中的任何一個返回數據(或者用戶做了某事),則需要比一個std::future<R>。一般模式 - 具有代表未來計算的令牌 - 是可靠的。但是你需要擴展它以適應未來計算和消息循環等多種來源。

未經測試的代碼。

一爲threaded_task_queue方法,當你發送的消息是:

template<class Signature> 
struct message_queue; 
template<class R, class...Args> 
struct message_queue<R(Args...) : 
    threaded_task_queue< std::function<R(Args...)> > 
{ 
    std::future<R> queue_message(Args...args) { 
    return this->queue_task(
     [tup = std::make_tuple(std::forward<Args>(args)...)] 
     (std::function<R(Args...)> f) mutable 
     { 
     return std::apply(f, std::move(tup)); 
     } 
    ); 
    } 
    bool consume_message(std::function<R(Args...)> f) 
    { 
    auto task = pop_task(); 
    if (!task) return false; 
    task(std::move(f)); 
    return true; 
    } 
}; 

其中對提供商來說,你提供Args...,並在消費者一方,你消耗Args...和返回R,並在供應商方面,你一旦消費者完成,有一個future<R>來獲得結果。

這可能比我寫的原始threaded_task_queue更自然。

std::apply是C++ 17,但是在C++ 11和C++ 14中有野性的實現。

+0

將函數'threaded_queue :: push_back'中的'cv.notify_one()'保留在另一個代碼塊之外的原因是什麼? – Steephen

+1

@Steephen'notify_one'不需要你持有互斥體。所以我沒有。在某些實現中,這可能會有所幫助,因爲偵聽線程不會被通知喚醒,而會立即阻塞互斥量。 – Yakk

+0

明白了。非常感謝您的澄清。 – Steephen