2017-03-03 60 views
1

一個簡單的例子,我有一個後續How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?什麼是工作XSUB/XPUB代理的zeromq

這個問題請使用XSUB和XPUB一個C++代理。給出的答案基本上是下面引用的代理main()函數。

我將此代理擴展爲包括髮布者和訂閱者的完整工作示例。問題在於我的代碼只適用於經銷商/路由器選項(如下面的評論所示)。通過下面的實際(未註釋)XPUB/XSUB選項,用戶不會收到消息。出了什麼問題?是否有一個調整來獲取消息?

代理不XPUB/XSUB工作(工作經銷商/在評論路由器)

#include <zmq.hpp> 

int main(int argc, char* argv[]) { 
    zmq::context_t ctx(1); 
    zmq::socket_t frontend(ctx, /*ZMQ_ROUTER*/ ZMQ_XSUB); 
    zmq::socket_t backend(ctx, /*ZMQ_DEALER*/ ZMQ_XPUB); 
    frontend.bind("tcp://*:5570"); 
    backend.bind("tcp://*:5571"); 
    zmq::proxy(frontend, backend, nullptr); 
    return 0; 
} 

用戶不ZMQ_SUB工作(在評論工作的經銷商/路由器選項)

#include <iostream> 
#include <zmq.hpp> 

std::string GetStringFromMessage(const zmq::message_t& msg) { 
    char* tmp = new char[msg.size()+1]; 
    memcpy(tmp,msg.data(),msg.size()); 
    tmp[msg.size()] = '\0'; 
    std::string rval(tmp); 
    delete[] tmp; 
    return rval; 
} 

int main(int argc, char* argv[]) { 
    zmq::context_t ctx(1); 
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_SUB); 
    socket.connect("tcp://localhost:5571"); 
    while (true) { 
     zmq::message_t identity; 
     zmq::message_t message; 
     socket.recv(&identity); 
     socket.recv(&message); 
     std::string identityStr(GetStringFromMessage(identity)); 
     std::string messageStr(GetStringFromMessage(message)); 
     std::cout << "Identity: " << identityStr << std::endl; 
     std::cout << "Message: " << messageStr << std::endl; 
    } 
} 

發行商與ZMQ_PUB不兼容(評論中的工作經銷商/路由器選項)

#include <unistd.h> 
#include <sstream> 
#include <zmq.hpp> 

int main (int argc, char* argv[]) 
{ 
    // Context 
    zmq::context_t ctx(1); 

    // Create a socket and set its identity attribute 
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_PUB); 
    char identity[10] = {}; 
    sprintf(identity, "%d", getpid()); 
    socket.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); 
    socket.connect("tcp://localhost:5570"); 

    // Send some messages 
    unsigned int counter = 0; 
    while (true) { 
     std::ostringstream ss; 
     ss << "Message #" << counter << " from PID " << getpid(); 
     socket.send(ss.str().c_str(),ss.str().length()); 
     counter++; 
     sleep(1); 
    } 
    return 0; 
} 
+0

似乎是一個緩慢的加入者問題。我遇到了同樣的問題 - 訂閱者無法獲取發佈者消息,除非在發送任何內容之前向發佈者添加「sleep(1)」。我還沒有找到一個可行的解決方案。看起來像XPUB/XSUB是一個破損的構造 – RPGillespie

+0

感謝RPGillespie尋找這個。 – Predrag3141

+0

經銷商/路由器不執行XPUB/XSUB的操作:每條消息都路由到經銷商/路由器的唯一用戶。所以在我問這個問題的時候,我沒有給所有用戶發送實例。 我解決了XPUB/XSUB無法通過使用ZMQ_PUSH套接字發佈以及ZMQ_PULL套接字作爲代理前端的問題。 – Predrag3141

回答

0

在用戶代碼中,您尚未訂閱接收來自發布者的消息。嘗試添加一行:

socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

前/後線:

socket.connect("tcp://localhost:5571"); 

在你的用戶代碼