2011-06-05 54 views
5

這是我的腳本。zero mq酒吧/分與多部分不工作


#!/usr/bin/env python 

import traceback 
import sys 
import zmq 
from time import sleep 

print "Creating the zmq.Context" 
context = zmq.Context() 

print "Binding the publisher to the local socket at port 5557" 
sender = context.socket(zmq.PUB) 
sender.bind("tcp://*:5557") 

print "Binding the subscriber to the local socket at port 5557" 
receiver = context.socket(zmq.SUB) 
receiver.connect("tcp://*:5557") 

print "Setting the subscriber option to get only those originating from \"B\"" 
receiver.setsockopt(zmq.SUBSCRIBE, "B") 

print "Waiting a second for the socket to be created." 
sleep(1) 

print "Sending messages" 
for i in range(1,10): 
    msg = "msg %d" % (i) 
    env = None 
    if i % 2 == 0: 
     env = ["B", msg] 
    else: 
     env = ["A", msg] 
    print "Sending Message: ", env 
    sender.send_multipart(env) 

print "Closing the sender." 
sender.close() 

failed_attempts = 0 
while failed_attempts < 3: 
    try: 
     print str(receiver.recv_multipart(zmq.NOBLOCK)) 
    except: 
     print traceback.format_exception(*sys.exc_info()) 
     failed_attempts += 1 

print "Closing the receiver." 
receiver.close() 

print "Terminating the context." 
context.term() 

""" 
Output: 

Creating the zmq.Context 
Binding the publisher to the local socket at port 5557 
Binding the subscriber to the local socket at port 5557 
Setting the subscriber option to get only those originating from "B" 
Waiting a second for the socket to be created. 
Sending messages 
Sending Message: ['A', 'msg 1'] 
Sending Message: ['B', 'msg 2'] 
Sending Message: ['A', 'msg 3'] 
Sending Message: ['B', 'msg 4'] 
Sending Message: ['A', 'msg 5'] 
Sending Message: ['B', 'msg 6'] 
Sending Message: ['A', 'msg 7'] 
Sending Message: ['B', 'msg 8'] 
Sending Message: ['A', 'msg 9'] 
Closing the sender. 
['B', 'msg 2'] 
['B', 'msg 4'] 
['B', 'msg 6'] 
['B', 'msg 8'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
Closing the receiver. 
Terminating the context. 
""" 

而問題是......爲什麼這段代碼不工作?

[編輯]在zeromq郵件列表上獲得超級快速響應後,我更新了上面的代碼。

回答

8

信用:查雷梅斯

您可能需要插座創建步驟(綁定,連接setsockopt的)和消息的實際傳輸之間的「休眠」。綁定&連接操作是異步的,因此當您到達發送所有消息的邏輯時,它們可能無法完成。在這種情況下,通過PUB套接字發送的任何消息都將被丟棄,因爲zmq_bind()操作在另一個套接字成功連接到該套接字之前不會創建隊列。

作爲一個方面說明,在本例中不需要創建2個上下文。兩個套接字都可以在相同的上下文中創建。它沒有傷害,但它也沒有必要。

來源:彼得

有一個 「問題解決者」 在Ch1結束,說明這一點。

某些套接字類型(ROUTER和PUB)將自動丟棄沒有收件人的郵件 。正如Chuck所說,連接是異步的,需要大約100毫秒。如果你啓動兩個線程,綁定 的一端,連接另一端,然後立即啓動以通過這種套接字類型發送數據 ,則將丟失前100毫秒的數據 (大約)。

做一個睡眠是一個殘酷的「證明它有效」的選項。實際上 你會以某種方式進行同步,或者(更一般地)希望消息丟失 作爲正常啓動的一部分(即,將發佈的數據看作純粹的 廣播,沒有明確的開始或結束)。

查看天氣更新示例,syncpub和syncsub瞭解詳情。