最終,答案是否。當數據到達您可以鏈接到的ZeroMQ時,目前沒有回調/通知。我也無法找到任何增加此功能的分支。
我無法獲得ZMQ的工作,同時使用傳統的OnReceive調用在單個線程內提供的MFC套接字框架,並添加第二個線程專門用於ZMQ,因此擊敗了使用它的整個目的(它被用於線程同步)。
我的實現最終刪除了MFC套接字,併爲我的inproc服務器(用於與其他線程通信)以及我的TCP(非ZMQ)服務器連接並使用阻止輪詢調用(zmq_poll())使用ZMQ )在OnIdle()方法中(每次返回1以創建一個忙碌循環)。阻塞民意調查
BOOL CMyThreaClass::OnIdle(LONG lCount)
{
UNREFERENCED_PARAMETER(lCount);
zmq_pollitem_t items [] = {
{ m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
{ m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
};
const int iZMQInfiniteTimeout(-1);
iResult = zmq_poll(&items[0], sizeof(items)/sizeof(items[0]), iZMQInfiniteTimeout);
TRACE("zmq_poll result: %d\n", iResult);
if (items[0].revents & ZMQ_POLLIN)
{
sMyStruct sMessage;
iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
TRACE("inproc recv result: %d\n", iResult);
// Process inproc messages
iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
TRACE("inproc send result: %d\n", iResult);
}
if (items[1].revents & ZMQ_POLLIN)
{
// there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
uint8_t id [256];
size_t id_size = 256;
iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
TRACE("getsockopt poll result %d:id %d\n", iResult, id);
iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
// now get our actual data
char szBuffer[1024];
int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
if (iBytesReceived > 0)
{
// process TCP data
}
}
}
注:這個答案需要使用ZMQ 4或更高版本,因爲早期版本的ZMQ不會與常規TCP套接字連接進行通信。
這實際上是我目前實現的方式(在CWinThread派生類中使用OnIdle),儘管我只是使用zmq_recv而不是輪詢,因爲我現在只有一個通信通道設置。問題是我沒有辦法控制OnIdle何時被調用,最終發生的是接收線程處於空閒狀態(已經處理了一個沒有被讀取的OnIdle事件),當發送者發送一個新的請求時。新請求不會觸發另一次讀取。編輯後的 – kinar 2014-11-21 20:58:03
,你可以從'OnIdle'返回非零值,讓MFC再次調用它。 – japreiss 2014-11-21 21:40:06
這個(已編輯的)解決方案不適合我的原因是我的ZMQ與主應用程序通常處於空閒狀態的單獨線程中。總是在空閒線程中返回OnIdle會創建一個繁忙循環並佔用100%CPU核心無所作爲。在線程停止時返回錯誤的結果,因爲ZMQ沒有被綁定到消息泵中,因此OnIdle不會再次被調用。 – kinar 2014-11-24 15:55:06