2012-01-30 53 views
5

我想序列化一個boost :: signals2信號的多線程調用,以確保有關從一個對象的狀態變化的通知按照明確定義的順序到達插槽。有沒有現成的方法來序列化boost :: signals2信號的調用?

背景

我有在多線程程序的內部狀態的對象。內部狀態的某些部分是對程序的其他部分有趣,並且對象通過一個boost :: signals2信號,類似這樣暴露狀態的改變:

class ObjectWithState { 
public: 
    enum State { 
     STATE_A, 
     STATE_B, 
     STATE_C, 
    }; 

    void OnEvent() { 
     State newState; 
     { 
      boost::lock_guard<boost::mutex> lock(m_Mutex); 
      // Process event and change state 
      m_State = ...; 
      newState = m_State; 
     } 
     m_OnStateChanged(newState); 
    } 

    // method to allow external objects to connect to the signal etc 
private: 
    boost::signals2::signal<void (State) > m_OnStateChanged; 
    boost::mutex m_Mutex; 
    State m_State; 
}; 

問題

如果有是OnEvent處理程序的多個併發調用,這可能會導致偵聽程序以其他順序的狀態更改的通知而不是實際發生的更改。狀態本身受上述互斥體的保護,所以實際的狀態很好定義。然而,互斥信號不能在通話過程中保持,因爲這可能導致死鎖。這意味着信號的實際調用可能以任何順序發生,而我會要求它們按照狀態更改實際發生的順序調用。

解決此問題的一種方法是從信號中移除狀態並僅通知偵聽器狀態已更改。然後他們可以查詢對象的狀態,並獲得對象在信號被觸發時的狀態或更高的狀態。在我的場景中,聽衆需要被告知所有的狀態改變,所以這種方法在這裏不起作用。

我的下一個方法是類似以下內容:

class ObjectWithState { 
public: 
    enum State { 
     STATE_A, 
     STATE_B, 
     STATE_C, 
    }; 

    void OnEvent() { 
     State newState; 
     boost::unique_future<void> waitForPrevious; 
     boost::shared_ptr<boost::promise<void> > releaseNext; 
     { 
      boost::lock_guard<boost::mutex> lock(m_Mutex); 
      // Process event and change state 
      m_State = ...; 
      newState = m_State; 
      waitForPrevious = m_CurrentInvocation->get_future(); 
      m_CurrentInvocation.reset(new boost::promise<void>()); 
      releaseNext = m_CurrentInvocation; 
     } 
     // Wait for all previous invocations of the signal to finish 
     waitForPrevious.get(); 

     // Now it is our turn to invoke the signal 
     // TODO: use try-catch/scoped object to release next if an exception is thrown 
     OnStateChanged(newState); 

     // Allow the next state change to use the signal 
     releaseNext->set_value(); 
    } 

    // method to allow external objects to connect to the signal etc 
private: 
    boost::signals2::signal<void (State) > m_OnStateChanged; 
    boost::mutex m_Mutex; 
    State m_State; 
    // Initialized with a "fulfilled" promise in the constructor 
    // or do special handling of initially empty promise above 
    boost::shared_ptr<boost::promise<void> > m_CurrentInvocation; 
}; 

我還沒有試過上面的代碼,所以它可能會與錯誤和編譯錯誤地散落着,但它應該是可以推導出什麼我之後。我的直覺告訴我,我不是第一個遇到這種類型的問題,我更喜歡使用經過試驗和測試的代碼,以我自己的... :)所以我的問題是:

是否有一種預先存在的方式實現boost :: signals2信號的序列化調用(如內置於signals2庫或普通模式)?

+0

「但是,互斥信號無法在通話過程中保持,因爲這可能導致死鎖。」你能澄清一下嗎?一些信號處理程序可以改變ObjectWithState的狀態嗎?他們會產生新的'OnEvent()'調用嗎?你如何確保你不會進入無限遞歸? – user1202136 2012-04-03 15:03:44

+0

@ user1202136:是的。遞歸調用將是一種解決死鎖的方法。更偷偷摸摸的方法是,如果你有兩個ObjectWithState併爲狀態變化設置一個處理程序,在某些情況下會觸發另一個狀態變化。這導致一個線程持有第一個互斥體並嘗試鎖定另一個線程,另一個線程持有另一個互斥體並嘗試鎖定第一個互斥體。因此,一般指導原則在調用未知代碼時永遠不要鎖定,請參閱http://drdobbs.com/article/print?articleId=202802983&siteSectionName=進行討論。 – villintehaspam 2012-04-04 08:44:38

回答

0

我建議以下解決方案。創建一個掛起信號的隊列,並有一個單獨的線程分派它們。代碼大致如下所示:

class ObjectWithState { 
private: 
    bool running; 
    std::queue<State> pendingSignals; 
    boost::condition_variable cond; 
    boost::mutex mut; 

    void dispatcherThread() 
    { 
     while (running) 
     { 
      /* local copy, so we don't need to hold a lock */ 
      std::vector<State> pendingSignalsCopy; 

      /* wait for new signals, then copy them locally */ 
      { 
       boost::unique_lock<boost::mutex> lock(mut); 
       cond.wait(mut); 
       pendingSignalsCopy = pendingSignals; 
       pendingSignals.clear(); 
      } 

      /* dispatch */ 
      while (!pendingSignalsCopy.empty()) 
      { 
       State newState = pendingSignalsCopy.front(); 
       OnStateChanged(newState); 
       pendingSignalsCopy.pop(); 
      } 
     } 
    } 

public: 
    void OnEvent() 
    { 
     State newState; 
     ... 

     /* add signal to queue of pending signals and wake up dispatcher thread */ 
     { 
      boost::unique_lock<boost::mutex> lock(mut); 
      pendingSignals.push(state); 
      cond.notify_all(); 
     } 
    } 
}; 
+0

這將部分實現我所要做的同樣的事情,但我並不那麼興奮地爲每個對象分配一個線程。因此,接下來的事情就是爲所有對象共享一個線程 - 但是爲此專門分配一個線程還是有點尷尬,所以您需要在線程池中分配工作。另外,這個實現並不能保證當OnStateChanged函數退出時,所有的監聽器都會得到通知,這在某些情況下可能是個問題(但是其他的一個特性)。 – villintehaspam 2012-04-04 09:48:44

+0

「另外,這個實現並不能保證當OnStateChanged函數退出時,所有監聽器都被通知」爲什麼不? IIRC,boost :: signals2 :: signal :: operator()遍歷所有槽,然後返回。此外,您的提議基本上阻止了waitForPrevious.get()中的所有線程(除了一個)。因此,爲通知奉獻一個線程可能會更好。 – user1202136 2012-04-04 13:09:30

+0

用OnEvent替換OnStateChanged,抱歉有關混淆。阻塞的區別正是我所指出的,在某些情況下,阻止線程對於確保通知已經發生是必要的,而在其他情況下,阻塞線程是一個可以讓線程繼續前進的功能。 – villintehaspam 2012-04-04 14:16:08

相關問題