觀光了解zeromq:
- 結合/連接順序通常並不重要
- 推/拉用於當一個對等應接收的每個消息和/或消息不應當被丟棄
- PUB/SUB用於所有對等方都應接收消息和/或發送無人聽到時發送的消息。
- ZeroMQ故意隱藏從應用程序代碼中按設計連接/斷開打開/關閉事件,因此您無法檢測到實際關閉事件。
您需要知道的一件事情是,您不應該:套接字連接時,它會創建一個管道(對等端不需要存在)。當套接字綁定時,它只在對等體連接時創建管道。這些管道管理套接字的HWM行爲。這意味着沒有對等連接套接字和沒有對等連接套接字的行爲是不同的。如果您嘗試使用綁定套接字發送郵件,則沒有對等方的綁定套接字將被阻止,而連接套接字將愉快地將郵件排隊等待到對方到達並開始使用郵件。
基於這幾點,你想要做的是:
- 使用推/拉
- 接收機應該綁定
- 發送一個特殊的「關閉」消息,指示隊列完成,而不是檢測到tcp/ipc級別的關閉事件。
以下是Python中的一個工作示例,它使用IPC套接字(文件)進行通信,接收方在發送方之後開始一段時間。
公共信息雙方需要知道:
import time
import zmq
# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'
# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'
接收器(PULL)結合,並佔用直到DONE
def receiver():
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind("ipc://%s" % PIPE)
while True:
parts = s.recv_multipart()
cmd = parts[0]
if cmd == DONE:
print "[R] received DONE"
break
msg = parts[1]
# handle the message
print "[R] %.1f consuming %s" % (time.time() - t0, msg)
s.close()
ctx.term()
print "[R] done"
發送者(PUSH)連接,併發送,發送完成對應信號完成
def sender():
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect("ipc://%s" % PIPE)
for i in range(10):
msg = b'msg %i' % i
print "[S] %.1f sending %s" % (time.time() - t0, msg)
s.send_multipart([MSG, msg])
time.sleep(1)
print "[S] sending DONE"
s.send(DONE)
s.close()
ctx.term()
print "[S] done"
還有一個演示腳本,它們一起運行,並且發送者啓動荷蘭國際集團第一,和接收器啓動後發送者已經發送了幾條消息:
from threading import Thread
# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()
# start the sender
s = Thread(target=sender)
s.start()
# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()
# wait for them both to finish
s.join()
r.join()
從而可以看出一起here運行。