2010-11-25 53 views
1

我想獲得關於下面列出的IService類的一些反饋。據我所知,這種類型與「活動對象」模式有關。如果我錯誤地使用任何相關術語,請原諒/糾正。基本上這個想法是,使用這個活動對象類的類需要提供一個控制某個事件循環的start和stop方法。這個事件循環可以通過while循環或boost asio等實現。線程相關的活動對象設計問題(C++ boost)

該類負責以非阻塞的方式啓動一個新線程,以便事件可以在新線程中處理。它還必須處理所有清理相關的代碼。我首先嚐試了一種面向對象方法,其中子類負責覆蓋方法來控制事件循環,但清理很麻煩:在析構函數調用stop方法時,在調用類沒有手動調用停止方法。模板化的解決方案似乎是一個更加簡潔:

template <typename T> 
class IService : private boost::noncopyable 
{ 
    typedef boost::shared_ptr<boost::thread> thread_ptr; 
public: 

    IService() 
    { 
    } 

    ~IService() 
    { 
    /// try stop the service in case it's running 
    stop(); 
    } 

    void start() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 

    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     // already running 
     return; 
    } 

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this))); 

    // need to wait for thread to start: else if destructor is called before thread has started 

    // Wait for condition to be signaled and then 
    // try timed wait since the application could deadlock if the thread never starts? 
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs()))) 
    //{ 
    //} 
    m_startCondition.wait(m_threadMutex); 

    // notify main to continue: it's blocked on the same condition var 
    m_startCondition.notify_one(); 
    } 

    void stop() 
    { 
    // trigger the stopping of the event loop 
    m_serviceObject.stop(); 

    if (m_pServiceThread) 
    { 
     if (m_pServiceThread->joinable()) 
     { 
     m_pServiceThread->join(); 
     } 
     // the service is stopped so we can reset the thread 
     m_pServiceThread.reset(); 
    } 
    } 

private: 
    /// entry point of thread 
    void main() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 
    // notify main thread that it can continue 
    m_startCondition.notify_one(); 

    // Try Dummy wait to allow 1st thread to resume??? 
    m_startCondition.wait(m_threadMutex); 

    // call template implementation of event loop 
    m_serviceObject.start(); 
    } 

    /// Service thread 
    thread_ptr m_pServiceThread; 
    /// Thread mutex 
    mutable boost::mutex m_threadMutex; 
    /// Condition for signaling start of thread 
    boost::condition m_startCondition; 

    /// T must satisfy the implicit service interface and provide a start and a stop method 
    T m_serviceObject; 
}; 

類可以使用如下:

class TestObject3 
{ 
public: 
    TestObject3() 
     :m_work(m_ioService), 
     m_timer(m_ioService, boost::posix_time::milliseconds(200)) 
    { 
     m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error)); 
    } 

    void start() 
    { 
     // simple event loop 
     m_ioService.run(); 
    } 

    void stop() 
    { 
     // signal end of event loop 
     m_ioService.stop(); 
    } 

    void doWork(const boost::system::error_code& e) 
    { 
     // Do some work here 
     if (e != boost::asio::error::operation_aborted) 
     { 
     m_timer.expires_from_now(boost::posix_time::milliseconds(200)); 
     m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error)); 
     } 
    } 

private: 
    boost::asio::io_service m_ioService; 
    boost::asio::io_service::work m_work; 
    boost::asio::deadline_timer m_timer; 
}; 

現在我的具體問題:

1)是利用升壓條件變量正確嗎?這對我來說似乎有些破綻:我想等待線程啓動,所以我等待條件變量。然後,一旦新的線程在main方法中啓動,我再次等待相同的條件變量以允許初始線程繼續。然後一旦初始線程的啓動方法退出,新線程就可以繼續。這個可以嗎?

2)是否有任何情況下,線程不會被操作系統成功啓動?我記得在某處可能會發生這種情況。如果可以的話,我寧願在條件變量上進行定時等待(正如start方法中註釋的那樣)?我知道模板類無法正確地實現停止方法,即如果事件循環無法停止,代碼將阻止連接(在停止或在析構函數中),但是我看不到這個。我猜這取決於該課程的用戶,以確保啓動和停止方法正確實施?

4)我將不勝感激任何其他設計錯誤,改進等?

謝謝!

1)經過大量的測試中使用的條件變量的優良似乎

2)這個問題還沒有冒出了(還)

3)模板類:

回答

0

下列終於塵埃落定實施必須滿足的要求,單元測試用於測試 正確性

4)改進

  • 添加鎖
  • 在捕獲產生的線程異常,在主線程中重新拋出,以避免崩潰並沒有鬆動的異常信息加入
  • 使用boost ::系統:: ERROR_CODE溝通錯誤代碼回呼叫者
  • 實施對象設置能夠

代碼:

template <typename T> 
class IService : private boost::noncopyable 
{ 
    typedef boost::shared_ptr<boost::thread> thread_ptr; 
    typedef T ServiceImpl; 
public: 
    typedef boost::shared_ptr<IService<T> > ptr; 

    IService() 
    :m_pServiceObject(&m_serviceObject) 
    { 
    } 

    ~IService() 
    { 
    /// try stop the service in case it's running 
    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     stop(); 
    } 
    } 

    static ptr create() 
    { 
    return boost::make_shared<IService<T> >(); 
    } 

    /// Accessor to service implementation. The handle can be used to configure the implementation object 
    ServiceImpl& get() { return m_serviceObject; } 
    /// Mutator to service implementation. The handle can be used to configure the implementation object 
    void set(ServiceImpl rServiceImpl) 
    { 
    // the implementation object cannot be modified once the thread has been created 
    assert(m_pServiceThread == 0); 
    m_serviceObject = rServiceImpl; 
    m_pServiceObject = &m_serviceObject; 
    } 

    void set(ServiceImpl* pServiceImpl) 
    { 
    // the implementation object cannot be modified once the thread has been created 
    assert(m_pServiceThread == 0); 

    // make sure service object is valid 
    if (pServiceImpl) 
     m_pServiceObject = pServiceImpl; 
    } 

    /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method 
    /// NB: only the last error can be accessed 
    boost::system::error_code getServiceErrorCode() const { return m_ecService; } 

    /// The join method allows the caller to block until thread completion 
    void join() 
    { 
    // protect this method from being called twice (e.g. by user and by stop) 
    boost::mutex::scoped_lock lock(m_joinMutex); 
    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     m_pServiceThread->join(); 
     m_pServiceThread.reset(); 
    } 
    } 

    /// This method launches the non-blocking service 
    boost::system::error_code start() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 

    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     // already running 
     return boost::system::error_code(SHARED_INVALID_STATE, shared_category); 
    } 

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this))); 
    // Wait for condition to be signaled 
    m_startCondition.wait(m_threadMutex); 

    // notify main to continue: it's blocked on the same condition var 
    m_startCondition.notify_one(); 
    // No error 
    return boost::system::error_code(); 
    } 

    /// This method stops the non-blocking service 
    boost::system::error_code stop() 
    { 
    // trigger the stopping of the event loop 
    //boost::system::error_code ec = m_serviceObject.stop(); 
    assert(m_pServiceObject); 
    boost::system::error_code ec = m_pServiceObject->stop(); 
    if (ec) 
    { 
     m_ecService = ec; 
     return ec; 
    } 

    // The service implementation can return an error code here for more information 
    // However it is the responsibility of the implementation to stop the service event loop (if running) 
    // Failure to do so, will result in a block 
    // If this occurs in practice, we may consider a timed join? 
    join(); 

    // If exception was thrown in new thread, rethrow it. 
    // Should the template implementation class want to avoid this, it should catch the exception 
    // in its start method and then return and error code instead 
    if(m_exception) 
     boost::rethrow_exception(m_exception); 

    return ec; 
    } 

private: 
    /// runs in it's own thread 
    void main() 
    { 
    try 
    { 
     boost::mutex::scoped_lock lock(m_threadMutex); 
     // notify main thread that it can continue 
     m_startCondition.notify_one(); 
     // Try Dummy wait to allow 1st thread to resume 
     m_startCondition.wait(m_threadMutex); 

     // call implementation of event loop 
     // This will block 
     // In scenarios where the service fails to start, the implementation can return an error code 
     m_ecService = m_pServiceObject->start(); 

     m_exception = boost::exception_ptr(); 
    } 
    catch (...) 
    { 
     m_exception = boost::current_exception(); 
    } 
    } 

    /// Service thread 
    thread_ptr m_pServiceThread; 
    /// Thread mutex 
    mutable boost::mutex m_threadMutex; 
    /// Join mutex 
    mutable boost::mutex m_joinMutex; 
    /// Condition for signaling start of thread 
    boost::condition m_startCondition; 

    /// T must satisfy the implicit service interface and provide a start and a stop method 
    T m_serviceObject; 
    T* m_pServiceObject; 
    // Error code for service implementation errors 
    boost::system::error_code m_ecService; 

    // Exception ptr to transport exception across different threads 
    boost::exception_ptr m_exception; 
}; 

進一步反饋/批評當然會受到歡迎。