2013-03-01 168 views
1

我有兩個進程('發送者'和'接收者')需要通過瞬態單向FIFO通信管道進行通信,本地在一臺機器上進行通信。下面是我想(用語言更接近於Unix域套接字)來發生的事情:ZeroMQ和本地FIFO

  • 發件人「創建」管道在著名的地址,並立即發送郵件,降低IT
  • 在某些時候(之前或後發送「創建」管),接收器連接到管
  • 閱讀器讀取消息關閉管的
  • 發件人「關閉」所有的消息都被讀出管
  • 閱讀器通知(可能的是管道關閉)

我的問題是:我如何使用ZeroMQ實現這一點? 「PUB/SUB」,「推/拉」?在ZMQ套接字中檢測「數據結束」的機制是什麼?是否可以同時允許上述前兩項的排序:即發送者或接收者是否首先嚐試連接?如果是這樣,怎麼樣?

謝謝。

回答

2

觀光了解zeromq:

  1. 結合/連接順序通常並不重要
  2. 推/拉用於當一個對等應接收的每個消息和/或消息不應當被丟棄
  3. PUB/SUB用於所有對等方都應接收消息和/或發送無人聽到時發送的消息。
  4. ZeroMQ故意隱藏從應用程序代碼中按設計連接/斷開打開/關閉事件,因此您無法檢測到實際關閉事件。

您需要知道的一件事情是,您不應該:套接字連接時,它會創建一個管道(對等端不需要存在)。當套接字綁定時,它只在對等體連接時創建管道。這些管道管理套接字的HWM行爲。這意味着沒有對等連接套接字和沒有對等連接套接字的行爲是不同的。如果您嘗試使用綁定套接字發送郵件,則沒有對等方的綁定套接字將被阻止,而連接套接字將愉快地將郵件排隊等待到對方到達並開始使用郵件。

基於這幾點,你想要做的是:

  1. 使用推/拉
  2. 接收機應該綁定
  3. 發送一個特殊的「關閉」消息,指示隊列完成,而不是檢測到tcp/ipc級別的關閉事件。

以下是Python中的一個工作示例,它使用IPC套接字(文件)進行通信,接收方在發送方之後開始一段時間。

公共信息雙方需要知道:

import time 

import zmq 

# the file used for IPC communication 
PIPE = '/tmp/fifo-pipe' 

# command flags for our tiny message protocol 
DONE = b'\x00' 
MSG = b'\x01' 

接收器(PULL)結合,並佔用直到DONE

def receiver(): 
    ctx = zmq.Context() 
    s = ctx.socket(zmq.PULL) 
    s.bind("ipc://%s" % PIPE) 
    while True: 
     parts = s.recv_multipart() 
     cmd = parts[0] 
     if cmd == DONE: 
      print "[R] received DONE" 
      break 
     msg = parts[1] 
     # handle the message 
     print "[R] %.1f consuming %s" % (time.time() - t0, msg) 
    s.close() 
    ctx.term() 
    print "[R] done" 

發送者(PUSH)連接,併發送,發送完成對應信號完成

def sender(): 
    ctx = zmq.Context() 
    s = ctx.socket(zmq.PUSH) 
    s.connect("ipc://%s" % PIPE) 

    for i in range(10): 
     msg = b'msg %i' % i 
     print "[S] %.1f sending %s" % (time.time() - t0, msg) 
     s.send_multipart([MSG, msg]) 
     time.sleep(1) 
    print "[S] sending DONE" 
    s.send(DONE) 
    s.close() 
    ctx.term() 
    print "[S] done" 

還有一個演示腳本,它們一起運行,並且發送者啓動荷蘭國際集團第一,和接收器啓動後發送者已經發送了幾條消息:

from threading import Thread 

# global t0, just for keeping times relative to start, rather than 1970 
t0 = time.time() 

# start the sender 
s = Thread(target=sender) 
s.start() 

# start the receiver after a delay 
time.sleep(5) 
r = Thread(target=receiver) 
r.start() 

# wait for them both to finish 
s.join() 
r.join() 

從而可以看出一起here運行。