2013-05-01 131 views
9

我已經實現了一些基於模塊的Active Object設計模式。這是非常簡單的實現。我有Scheduler,ActivationList,Requests和Futures來獲得迴應。 我的要求是這樣的:boost :: asio和Active Object

  • 主動對象的訪問應通過執行其自己的線程內,其方法 (主REQ和主動對象 設計模式的假設)
  • 來電應能夠指定序列化請求執行的優先級。這意味着如果有超過零個請求等待執行,它們應按照分配給每個請求的優先級進行排序。具有更高優先級的請求應首先執行,因此如果始終有一些ActivationList待處理請求,並且它們的優先級高於給定請求,則該請求將永遠不會執行 - 對我來說它是OK的
  • 應該可以指定列表中待處理請求的最大數量(限制內存使用量)
  • 應該有可能使所有待處理請求無效
  • 請求應能夠返回值(阻塞調用者)或只需執行無值返回但呼叫者將被阻止,直到請求被處理或者呼叫者不被阻止,並且如果給定的請求已被處理或不被阻止,則它不重要 g
  • 在請求執行之前,應執行一些警戒方法來檢查是否執行給定的請求。如果沒有 - 它應返回一些不確定值調用者(在我目前的實現方式中,提高::沒有,因爲每個請求返回類型爲推動::可選)現在

OK問題: 是否有可能使用boost :: asio並滿足我所有的要求?我的實現工作正在進行,但我想使用一些可能比我做得更好的方法。我也想知道它的未來,而不是「重新發明輪子」。

+0

升壓ASIO不會阻塞。最後一條聲明涵蓋了倒數第二部分。一切都完全可以在常規的C++中完成,但無疑,使用它更容易。如果你還沒有使用它,可能還想檢查boost序列化。 – johnathon 2013-05-01 01:19:54

+0

我已經使用普通的C++實現了它。其實在boost線程和boost多指標包容器的幫助下。但目標是不使用我的ipmplementation,而是使用boost :: asio。 – user2301299 2013-05-01 02:07:29

回答

28

Boost.Asio可用於涵蓋Active Object的意圖:從方法調用中分離方法執行。額外的需求將需要在更高層次上處理,但將Boost.Asio與其他Boost庫結合使用時不會過於複雜。

Scheduler可以使用:

ActivationList可以被實現爲:

  • Boost.MultiIndex用於獲得最高優先級子程序的請求。使用暗示位置insert()時,插入順序將被保留以用於具有相同優先級的請求。
  • std::multisetstd::multimap可以使用。然而,在C++ 03中沒有指定具有相同密鑰(優先級)的請求的順序。
  • 如果Request不需要防護方法,則可以使用std::priority_queue

Request可以是未指定類型:

  • boost::functionboost::bind可以用來提供一個類型擦除,而不會引入Request層次結構結合到可調用的類型。

Futures可以使用Boost.Thread的Futures支持。如果Request已添加到ActivationList

  • future.valid()將返回true。
  • future.wait()將阻止等待結果變爲可用。
  • future.get()將阻止等待結果。
  • 如果調用者對future不做任何處理,則調用者不會被阻止。
  • 使用Boost.Thread的期貨的另一個好處是源自Request的異常將傳遞到Future

下面是一個完整示例各種利用Boost庫和應符合要求:

// Standard includes 
#include <algorithm> // std::find_if 
#include <iostream> 
#include <string> 

// 3rd party includes 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/function.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/multi_index_container.hpp> 
#include <boost/multi_index/ordered_index.hpp> 
#include <boost/multi_index/member.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 
#include <boost/utility/result_of.hpp> 

/// @brief scheduler that provides limits with prioritized jobs. 
template <typename Priority, 
      typename Compare = std::less<Priority> > 
class scheduler 
{ 
public: 
    typedef Priority priority_type; 
private: 

    /// @brief method_request is used to couple the guard and call 
    ///  functions for a given method. 
    struct method_request 
    { 
    typedef boost::function<bool()> ready_func_type; 
    typedef boost::function<void()> run_func_type; 

    template <typename ReadyFunctor, 
       typename RunFunctor> 
    method_request(ReadyFunctor ready, 
        RunFunctor run) 
     : ready(ready), 
     run(run) 
    {} 

    ready_func_type ready; 
    run_func_type run; 
    }; 

    /// @brief Pair type used to associate a request with its priority. 
    typedef std::pair<priority_type, 
        boost::shared_ptr<method_request> > pair_type; 

    static bool is_method_ready(const pair_type& pair) 
    { 
    return pair.second->ready(); 
    } 

public: 

    /// @brief Construct scheduler. 
    /// 
    /// @param max_threads Maximum amount of concurrent task. 
    /// @param max_request Maximum amount of request. 
    scheduler(std::size_t max_threads, 
      std::size_t max_request) 
    : work_(io_service_), 
     max_request_(max_request), 
     request_count_(0) 
    { 
    // Spawn threads, dedicating them to the io_service. 
    for (std::size_t i = 0; i < max_threads; ++i) 
     threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, &io_service_)); 
    } 

    /// @brief Destructor. 
    ~scheduler() 
    { 
    // Release threads from the io_service. 
    io_service_.stop(); 
    // Cleanup. 
    threads_.join_all(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    typedef typename boost::result_of<RunFunctor()>::type result_type; 
    typedef boost::unique_future<result_type> future_type; 

    boost::unique_lock<mutex_type> lock(mutex_); 

    // If max request has been reached, then return an invalid future. 
    if (max_request_ && 
     (request_count_ == max_request_)) 
     return future_type(); 

    ++request_count_; 

    // Use a packaged task to handle populating promise and future. 
    typedef boost::packaged_task<result_type> task_type; 

    // Bind does not work with rvalue, and packaged_task is only moveable, 
    // so allocate a shared pointer. 
    boost::shared_ptr<task_type> task = 
     boost::make_shared<task_type>(run_func); 

    // Create method request. 
    boost::shared_ptr<method_request> request = 
     boost::make_shared<method_request>(
     ready_func, 
     boost::bind(&task_type::operator(), task)); 

    // Insert into priority. Hint to inserting as close to the end as 
    // possible to preserve insertion order for request with same priority. 
    activation_list_.insert(activation_list_.end(), 
          pair_type(priority, request)); 

    // There is now an outstanding request, so post to dispatch. 
    io_service_.post(boost::bind(&scheduler::dispatch, this)); 

    return task->get_future(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    return insert(priority_type(), ready_func, run_func); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const RunFunctor& run_func) 
    { 
    return insert(priority, &always_ready, run_func); 
    } 

    /// @brief Insert a method request with default priority into the 
    ///  scheduler. 
    /// 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @param functor Job to run. 
    /// 
    /// @return future associated with the job. 
    template <typename RunFunc> 
    boost::unique_future<typename boost::result_of<RunFunc()>::type> 
    insert(const RunFunc& run_func) 
    { 
    return insert(&always_ready, run_func); 
    } 

    /// @brief Cancel all outstanding request. 
    void cancel() 
    { 
    boost::unique_lock<mutex_type> lock(mutex_); 
    activation_list_.clear(); 
    request_count_ = 0; 
    } 

private: 

    /// @brief Dispatch a request. 
    void dispatch() 
    { 
    // Get the current highest priority request ready to run from the queue. 
    boost::unique_lock<mutex_type> lock(mutex_); 
    if (activation_list_.empty()) return; 

    // Find the highest priority method ready to run. 
    typedef typename activation_list_type::iterator iterator; 
    iterator end = activation_list_.end(); 
    iterator result = std::find_if(
     activation_list_.begin(), end, &is_method_ready); 

    // If no methods are ready, then post into dispatch, as the 
    // method may have become ready. 
    if (end == result) 
    { 
     io_service_.post(boost::bind(&scheduler::dispatch, this)); 
     return; 
    } 

    // Take ownership of request. 
    boost::shared_ptr<method_request> method = result->second; 
    activation_list_.erase(result); 

    // Run method without mutex. 
    lock.unlock(); 
    method->run();  
    lock.lock(); 

    // Perform bookkeeping. 
    --request_count_; 
    } 

    static bool always_ready() { return true; } 

private: 

    /// @brief List of outstanding request. 
    typedef boost::multi_index_container< 
    pair_type, 
    boost::multi_index::indexed_by< 
     boost::multi_index::ordered_non_unique< 
     boost::multi_index::member<pair_type, 
            typename pair_type::first_type, 
            &pair_type::first>, 
     Compare 
     > 
    > 
    > activation_list_type; 
    activation_list_type activation_list_; 

    /// @brief Thread group managing threads servicing pool. 
    boost::thread_group threads_; 

    /// @brief io_service used to function as a thread pool. 
    boost::asio::io_service io_service_; 

    /// @brief Work is used to keep threads servicing io_service. 
    boost::asio::io_service::work work_; 

    /// @brief Maximum amount of request. 
    const std::size_t max_request_; 

    /// @brief Count of outstanding request. 
    std::size_t request_count_; 

    /// @brief Synchronize access to the activation list. 
    typedef boost::mutex mutex_type; 
    mutex_type mutex_; 
}; 

typedef scheduler<unsigned int, 
        std::greater<unsigned int> > high_priority_scheduler; 

/// @brief adder is a simple proxy that will delegate work to 
///  the scheduler. 
class adder 
{ 
public: 
    adder(high_priority_scheduler& scheduler) 
    : scheduler_(scheduler) 
    {} 

    /// @brief Add a and b with a priority. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(
    high_priority_scheduler::priority_type priority, 
    const T& a, const T& b) 
    { 
    // Insert method request 
    return scheduler_.insert(
     priority, 
     boost::bind(&adder::do_add<T>, a, b)); 
    } 

    /// @brief Add a and b. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(const T& a, const T& b) 
    { 
    return add(high_priority_scheduler::priority_type(), a, b); 
    } 

private: 

    /// @brief Actual add a and b. 
    template <typename T> 
    static T do_add(const T& a, const T& b) 
    { 
    std::cout << "Starting addition of '" << a 
       << "' and '" << b << "'" << std::endl; 
    // Mimic busy work. 
    boost::this_thread::sleep_for(boost::chrono::seconds(2)); 
    std::cout << "Finished addition" << std::endl; 
    return a + b; 
    } 

private: 
    high_priority_scheduler& scheduler_; 
}; 

bool get(bool& value) { return value; } 
void guarded_call() 
{ 
    std::cout << "guarded_call" << std::endl; 
} 

int main() 
{ 
    const unsigned int max_threads = 1; 
    const unsigned int max_request = 4; 

    // Sscheduler 
    high_priority_scheduler scheduler(max_threads, max_request); 

    // Proxy 
    adder adder(scheduler); 

    // Client 

    // Add guarded method to scheduler. 
    bool ready = false; 
    std::cout << "Add guarded method." << std::endl; 
    boost::unique_future<void> future1 = scheduler.insert(
    boost::bind(&get, boost::ref(ready)), 
    &guarded_call); 

    // Add 1 + 100 with default priority. 
    boost::unique_future<int> future2 = adder.add(1, 100); 

    // Force sleep to try to get scheduler to run request 2 first. 
    boost::this_thread::sleep_for(boost::chrono::seconds(1)); 

    // Add: 
    // 2 + 200 with low priority (5) 
    // "test" + "this" with high priority (99) 
    boost::unique_future<int> future3 = adder.add(5, 2, 200); 
    boost::unique_future<std::string> future4 = adder.add(99, 
    std::string("test"), std::string("this")); 

    // Max request should have been reached, so add another. 
    boost::unique_future<int> future5 = adder.add(3, 300); 

    // Check if request was added. 
    std::cout << "future1 is valid: " << future1.valid() 
      << "\nfuture2 is valid: " << future2.valid() 
      << "\nfuture3 is valid: " << future3.valid() 
      << "\nfuture4 is valid: " << future4.valid() 
      << "\nfuture5 is valid: " << future5.valid() 
      << std::endl; 

    // Get results for future2 and future3. Do nothing with future4's results. 
    std::cout << "future2 result: " << future2.get() 
      << "\nfuture3 result: " << future3.get() 
      << std::endl; 

    std::cout << "Unguarding method." << std::endl; 
    ready = true; 
    future1.wait(); 
} 

執行使用的1線程池4請求的最大

  • request1被監視直到程序結束,應該是最後一次運行。
  • request2(1 + 100)以默認優先級插入,並且應該先運行。
  • request3(2 + 200)被插入低優先級,並且應該在request4之後運行。
  • request4('test'+'this')以高優先級插入,並且應該在request3之前運行。
  • request5應該由於最大請求而無法插入,並且應該無效。

輸出如下:

Add guarded method. 
Starting addition of '1' and '100' 
future1 is valid: 1 
future2 is valid: 1 
future3 is valid: 1 
future4 is valid: 1 
future5 is valid: 0 
Finished addition 
Starting addition of 'test' and 'this' 
Finished addition 
Starting addition of '2' and '200' 
Finished addition 
future2 result: 101 
future3 result: 202 
Unguarding method. 
guarded_call
+1

感謝這個答案,希望我能給你多1個upvote。 – MrEvil 2013-07-08 16:20:32

+0

非常有幫助的帖子,什麼助力缺乏是我不必沖刷github//所以用於 – arynaq 2017-09-29 08:34:55