0
我試圖使用pyzmq的multiprocessing.Process
一個領域內PUB/SUB
插座裏面的原型沒有收到消息:pub/sub模型一個multiprocessing.Process
我有一個用戶:
import time
import collections
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect("tcp://localhost:5000")
nb_recv = 0
begin = time.time()
counter = collections.defaultdict(int)
while True:
msg = socket.recv_json()
print(msg)
和兩個不同的發佈者實現。
有了這一個,訂戶接收消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
pass
def run(self):
self._socket = self._context.socket(zmq.PUB)
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
但與這一個(其中,唯一的區別是的socket
的創建是在構造,而不是被所述run()
類方法),訂戶不接收任何消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB) # <---------
pass
def run(self):
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
當我用threading.Thread
替換multiprocessing.Process
時,這兩個類都工作正常,但我沒有在文檔中找到任何解釋。