2011-03-29 49 views
2

我試圖使用zeroMQ作爲在多個線程之間實現消息傳遞系統的一種方式。我嘗試了下面的代碼,但它不起作用;在具體的每個線程中調用zmq_recv不會等待/阻止任何消息被執行。使用ZeroMQ消息的線程間通信

你能幫我這段代碼嗎?

我使用Linux操作系統和gcc

問候

AFG

static void * 
    worker_routine (void *context) { 
     // Socket to talk to dispatcher 
     void *receiver = zmq_socket (context, ZMQ_REP); 
     zmq_connect (receiver, "inproc://workers"); 
     while (1) { 

      zmq_msg_t request; 
      zmq_msg_init(&request); 
      zmq_recv(receiver, &request, 0); 
      printf ("Received request\n"); 
      // Do some 'work' 
      usleep (1000); 
      // Send reply back to client 
      zmq_send (receiver, &request, 0); 
     } 
     zmq_close (receiver); 
     return NULL; 
    } 

    int main (void) { 

    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
    } 
+0

我再次閱讀指南ZeroMQ。有誰知道我是否需要爲我的目的創建一個像QUEUE這樣的zmq_device?我還注意到,有樣本使用「ipc」作爲協議..我總是認爲對於MT我必須使用「inproc」..任何人都知道這是否會產生影響? – 2011-03-29 21:44:58

回答

3

你關閉插座,ZeroMQ要創建的線程之後。他們可能沒有時間達到阻止狀態,如果他們這樣做了,只要您銷燬zmq上下文,他們就會失敗。從zmq_term man page

上下文終止通過以下步驟進行:

任何阻塞目前在插座範圍內開放正在進行的操作應立即返回與ETERM的錯誤代碼。

+0

我可能需要添加也嘗試添加一些「睡眠」;無論如何,我仍然不知道我預期的行爲是否滿足該設置..請參閱我的意見下面的問題。 PS。謝謝你的幫助! – 2011-03-29 21:46:29

6

兩個套接字都是REP。你想要的是REQ + REP。

0

首先,由於@sustrik指出你需要使用REQREP,主線程和工作線程都不能是REP

其次,您需要提供一些種類的阻塞循環在你的主線程:

int main (int argc, char **argv) 
{ 
    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    while (TRUE) 
    { 
        // worker thread connected asking for work 
        zmq_msg_t request; 
        zmq_msg_init (&request); 
     zmq_recv (clients, &request, 0); 
     zmq_msg_close (&request); 

     // do whatever you need to do with the clients' request here 

     // send work to clients 
     zmq_msg_t reply; 
     zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL); 
     zmq_send (clients, &reply, 0); 
     zmq_msg_close (&reply); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
}