2014-11-21 80 views
1

我想將ZMQ整合到一個依賴MFC套接字(CASyncSocket)的現有Windows應用程序中。當數據到達時,ZeroMQ是否有通知/回叫事件/消息?

我有一個CWinThread派生UI線程(沒有GUI),它使用CAsyncSocket異步地與服務器進行通信。我想添加一個ZMQ inproc通信線來處理從服務器接收的數據(在REQ/REP基礎上)與應用程序內的其他線程之間的通信。

使用CAsyncSocket時,只要接收到的套接字上有新的數據可用(這可能是對硬核MFC大師的過度簡化),MFC框架就會調用OnReceive方法。

ZMQ有沒有這樣的機制?或者,我是否必須添加一個額外的專用WorkerThread,UI線程將啓動它來處理與應用程序其餘部分的ZMQ通信?這兩個管道上的流量是最小的,所以我真的不想創建2個獨立的線程,如果我可以得到與1.

請注意,我有基本工作,我只是有問題同步。如果我使用阻塞recv/send與ZMQ,它餓死了我的CAsycSocket,因爲windows消息永遠不會被線程處理,導致有時永遠不會從ZMQ應該提供的服務器獲取數據。但是如果我使用非阻塞ZMQ調用,那麼線程經常會因爲不知道讀取ZMQ套接字而處於空閒狀態。

回答

0

最終,答案是。當數據到達您可以鏈接到的ZeroMQ時,目前沒有回調/通知。我也無法找到任何增加此功能的分支。

我無法獲得ZMQ的工作,同時使用傳統的OnReceive調用在單個線程內提供的MFC套接字框架,並添加第二個線程專門用於ZMQ,因此擊敗了使用它的整個目的(它被用於線程同步)。

我的實現最終刪除了MFC套接字,併爲我的inproc服務器(用於與其他線程通信)以及我的TCP(非ZMQ)服務器連接並使用阻止輪詢調用(zmq_poll())使用ZMQ )在OnIdle()方法中(每次返回1以創建一個忙碌循環)。阻塞民意調查

BOOL CMyThreaClass::OnIdle(LONG lCount) 
{ 
    UNREFERENCED_PARAMETER(lCount); 

    zmq_pollitem_t items [] = { 
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 }, 
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 } 
    }; 
    const int iZMQInfiniteTimeout(-1); 
    iResult = zmq_poll(&items[0], sizeof(items)/sizeof(items[0]), iZMQInfiniteTimeout); 
    TRACE("zmq_poll result: %d\n", iResult); 

    if (items[0].revents & ZMQ_POLLIN) 
    { 
     sMyStruct sMessage; 
     iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us) 
     TRACE("inproc recv result: %d\n", iResult); 
     // Process inproc messages 
     iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block 
     TRACE("inproc send result: %d\n", iResult); 
    } 
    if (items[1].revents & ZMQ_POLLIN) 
    { 
     // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first 
     uint8_t id [256]; 
     size_t id_size = 256; 
     iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size); 
     TRACE("getsockopt poll result %d:id %d\n", iResult, id); 
     iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block 
     // now get our actual data 
     char szBuffer[1024]; 
     int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT); 
     if (iBytesReceived > 0) 
     { 
      // process TCP data 
     } 
    } 
} 

注:這個答案需要使用ZMQ 4或更高版本,因爲早期版本的ZMQ不會與常規TCP套接字連接進行通信。

0

通過覆蓋CWinApp::OnIdle(),您可以在應用程序的主消息循環中用ZMQ_NOBLOCK標誌調用zmq_recv()。如果在套接字上沒有數據等待,則zmq_recv將立即返回。如果有數據,處理它 - 但要注意,如果你做了一些緩慢的事情,你會使應用程序無響應。

編輯:我沒有意識到OnIdle只有在消息隊列變空時才被調用一次。但根據MSDN's documentation,可以返回保持一個非零值獲取調用永遠:

  1. 如果消息循環檢查消息隊列和發現沒有未決的消息,它調用OnIdle和建築材料0作爲lCount參數。
  2. OnIdle執行一些處理並返回一個非零值以指示應該再次調用它以做進一步處理。
  3. 消息循環再次檢查消息隊列。如果沒有消息未決,則再次調用OnIdle,遞增lCount參數。
  4. 最終,OnIdle完成處理其所有空閒任務並返回0。這告訴消息循環停止調用OnIdle,直到從消息隊列接收到下一條消息,此時空閒循環重新啓動,參數設置爲0

我還發現this thread on GameDev.net其中一個用戶說:

所有我曾經用MFC編寫的工具,使用D3D使用的OnIdle()函數:

BOOL CD3DEditorApp::OnIdle(LONG lCount) 
{ 
    // Call base class first 
    CWinApp:OnIdle(lCount); 

    // game stuff... 

    // Always return true - this asks the framework to constantly 
    // call the Idle function when it isn't busy doing something 
    // else. 
    return TRUE; 
} 

所以,至少據一個人來說,這是一種常見的技術。

+0

這實際上是我目前實現的方式(在CWinThread派生類中使用OnIdle),儘管我只是使用zmq_recv而不是輪詢,因爲我現在只有一個通信通道設置。問題是我沒有辦法控制OnIdle何時被調用,最終發生的是接收線程處於空閒狀態(已經處理了一個沒有被讀取的OnIdle事件),當發送者發送一個新的請求時。新請求不會觸發另一次讀取。編輯後的 – kinar 2014-11-21 20:58:03

+0

,你可以從'OnIdle'返回非零值,讓MFC再次調用它。 – japreiss 2014-11-21 21:40:06

+0

這個(已編輯的)解決方案不適合我的原因是我的ZMQ與主應用程序通常處於空閒狀態的單獨線程中。總是在空閒線程中返回OnIdle會創建一個繁忙循環並佔用100%CPU核心無所作爲。在線程停止時返回錯誤的結果,因爲ZMQ沒有被綁定到消息泵中,因此OnIdle不會再次被調用。 – kinar 2014-11-24 15:55:06

0

您可以使用自定義Windows消息從線程發出線程信號。這是一個自定義消息:

#define WM_MY_MESSAGE (WM_APP + 1) 

要發送到具有窗口的線程使用PostMessage或SendMessage到HWND。使用ON_MESSAGE將其添加到窗口的消息映射中。

要將它發送給沒有窗口的CWinThread派生線程,請使用PostThreadMessage並使用ON_THREAD_MESSAGE接收它。

+0

這與問題完全無關。除非你建議修改ZMQ將消息發佈到父線程,然後創建類似於MFC套接字的自定義事件處理。另外,在線程之間使用SendMessage是一個糟糕的主意。如果可以,我會投票。 – kinar 2014-11-22 23:02:49

相關問題