2017-10-12 81 views
0

我試圖使用pyzmq的multiprocessing.Process一個領域內PUB/SUB插座裏面的原型沒有收到消息:pub/sub模型一個multiprocessing.Process

我有一個用戶:

import time 
import collections  
import zmq 

context = zmq.Context() 
socket = context.socket(zmq.SUB) 
socket.setsockopt(zmq.SUBSCRIBE, b"") 
socket.connect("tcp://localhost:5000") 

nb_recv = 0 
begin = time.time() 
counter = collections.defaultdict(int) 
while True: 
    msg = socket.recv_json() 
    print(msg) 

和兩個不同的發佈者實現。

有了這一個,訂戶接收消息:

import zmq 
from multiprocessing import Process 

class Sender(object): 

    def __init__(self): 
     self._context = zmq.Context() 
     pass 

    def run(self): 
     self._socket = self._context.socket(zmq.PUB) 
     self._socket.bind("tcp://127.0.0.1:5000") 
     seq_num = 0 
     while True: 
      msg = { "sequence": seq_num } 
      self._socket.send_json(msg) 
      seq_num += 1 

if __name__ == "__main__": 
    s = Sender() 
    p = Process(target=s.run) 
    p.start() 
    p.join() 

但與這一個(其中,唯一的區別是的socket的創建是在構造,而不是被所述run()類方法),訂戶不接收任何消息:

import zmq 
from multiprocessing import Process 

class Sender(object): 

    def __init__(self): 
     self._context = zmq.Context() 
     self._socket = self._context.socket(zmq.PUB) # <--------- 
     pass 

    def run(self): 
     self._socket.bind("tcp://127.0.0.1:5000") 
     seq_num = 0 
     while True: 
      msg = { "sequence": seq_num } 
      self._socket.send_json(msg) 
      seq_num += 1 

if __name__ == "__main__": 
    s = Sender() 
    p = Process(target=s.run) 
    p.start() 
    p.join() 

當我用threading.Thread替換multiprocessing.Process時,這兩個類都工作正常,但我沒有在文檔中找到任何解釋。

回答

0

您正在一個進程中創建對象,並試圖在另一個進程中執行該方法。

可能createmultiprocessing進程之間甚至share對象與multiprocessing.Manager但因爲這個對象持有不可共享的資源(網絡接口),你最好的工作進程中創建它,除非你想走走酸洗雷區,並用不可揀拾的田地來使用物體。