2017-10-06 99 views
1

在Python我有使用「輪詢」對象,其輪詢阻斷等待消息插座和放開毫秒的指定次數後的選項(在殼體下面,1000,在同時真塊):如何在Erlang或Elixir中使用ZeroMQ實現非阻塞套接字?

import zmq 

# now open up all the sockets 
context = zmq.Context() 
outsub = context.socket(zmq.SUB) 
outsub.bind("tcp://" + myip + ":" + str(args.outsubport)) 
outsub.setsockopt(zmq.SUBSCRIBE, b"") 
inreq = context.socket(zmq.ROUTER) 
inreq.bind("tcp://" + myip + ":" + str(args.inreqport)) 
outref = context.socket(zmq.ROUTER) 
outref.bind("tcp://" + myip + ":" + str(args.outrefport)) 
req = context.socket(zmq.ROUTER) 
req.bind("tcp://" + myip + ":" + str(args.reqport)) 
repub = context.socket(zmq.PUB) 
repub.bind("tcp://" + myip + ":" + str(args.repubport)) 

# sort out the poller 
poller = zmq.Poller() 
poller.register(inreq, zmq.POLLIN) 
poller.register(outsub, zmq.POLLIN) 
poller.register(outref, zmq.POLLIN) 
poller.register(req, zmq.POLLIN) 

# UDP socket setup for broadcasting this server's address 
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

# housekeeping variables 
pulsecheck = datetime.utcnow() + timedelta(seconds = 1) 
alivelist = dict() 
pulsetimeout = 5 

while True: 
    polls = dict(poller.poll(1000)) 
    if inreq in polls: 
     msg = inreq.recv_multipart() 
     if msg[1] == b"pulse":   # handle pluse 
      ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode()) 
      if not msg[0] in alivelist.keys(): 
       handlechange(msg[0]) 
      alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout) 
    if outsub in polls: 
     msgin = outsub.recv_multipart()[0] 
     repub.send(msgin) # republish 
     msg = unpacker(msgin) 
     if isinstance(msg, dict): 
      valu = msg.get("value") 
      print(".", end = "", flush = True) 
     else: 
      ansi("green", False, textout = msg) 

    if req in polls: 
     msg = req.recv_multipart() 
     valmsg = validate_request(msg) 
     if not valmsg[0]: 
      ansi("red", True); print(valmsg[1]); ansi() 
     elif len(alivelist) > 0: 
      targetnode = random.choice(list(alivelist.keys())) 
      inreq.send_multipart([targetnode, packer(valmsg[1])]) 
      ansi("blue", True, textout = "sent to " + targetnode.decode()) 
     else: 
      ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO") 
    if outref in polls: 
     msg = outref.recv_multipart() 
     destinataire, correlid = msg[1].split(b"/") 
     req.send_multipart([destinataire, correlid, msg[2]]) 

我想在Elixir(或Erlang)中實現類似的東西,但我的首選本機庫chumak似乎沒有實現輪詢。我如何在Erlang/Elixir中實現非阻塞接收,最好是使用Chumak,但如果需要,我會移至另一個Erlang zeroMQ庫?我的套接字模式首選是路由器發送,經銷商收到。

編輯

我的使用情況如下。我有第三方金融服務,根據請求提供數據,異步回答。因此,您可以發送多個請求,並在未指定的時間段後收到回覆,但不一定按照您發送給他們的順序回覆。

所以我需要將這個服務連接到Erlang(實際上是Elixir)和ZeroMQ似乎是一個很好的契合。連接(通過Phoenix)到Erlang/Elixir的多個用戶將發送請求,並且我需要將這些請求傳遞給此服務。

如果其中一個請求發生錯誤,或者第三方服務存在某種問題,則會出現問題。我將阻止 - 等待響應,然後無法爲鳳凰城的新請求提供服務。

基本上我想經常聽新的請求,發送它們,但如果一個請求沒有產生響應,我會有比請求少一個響應,這會導致永久等待。

我明白,如果我分別發送請求,那麼好的會產生響應,所以即使隨着時間的推移,我發送的請求和接收到的響應之間的數字差別也不會很大。也許設計理念是我不應該擔心這個?或者我應該嘗試追蹤一對一的請求響應並以某種方式暫停非響應?這是一種有效的設計模式嗎?

+0

使用一個進程擁有套接字,阻止接收和*反應*接收消息。使用另一個流程來完成其他任務(可能按照計劃)。 *你不想輪詢*,你想建立一個邏輯系統'鏈接進程,執行一些任務的合作*沒有輪詢*。這是Erlang併發方法的重點。如果你解釋你試圖達到的整體效果,它會有很大幫助 - 因爲這幾乎肯定是一個X-Y問題。有一個[Erlang/OTP聊天室](https://chat.stackoverflow.com/rooms/75358/erlang-otp)。 – zxq9

+0

@ zxq9好吧我已經更新了這個問題。它更有意義嗎?我應該把它搬到Erlang房間嗎?問題是我更喜歡Elixir學習者(開始學習,但知道大部分基礎知識,包括GenServer和應用程序)。 –

+0

@ zxq9我想我的問題是,如果Erlang進程在永遠不會接收的接收上阻塞,它將無法處理來自Phoenix的新的*傳入*請求。或者我的錯誤是我應該在另一個過程中處理所述請求? –

回答

4

您的系統是否不斷連接到異步查詢資源,或者您是否與每個查詢建立新的連接?

每種情況在Erlang都有自己的天然模型。

的情況:單(或池)長期連接(s)表示,保持與資源(與數據庫的連接會的工作方式)會話

長期連接在系統中最自然地被建模爲過程,它們唯一的工作就是表示外部資源。

這一進程的要求是:

  • 翻譯外部資源的信息到有意義的內部郵件(不只是路過垃圾通過 - 不要讓原材料,外部數據侵入你的系統,除非它是完全不透明給你)
  • 跟蹤超時的請求(這可能需要的東西有點像投票,但可以更精確地erlang:send_after/3

當然這也意味着,完成的,該模塊impleme在這個過程中需要說出該資源的協議。但是如果這樣做完成,那麼實際上不需要像MQ應用程序那樣的消息傳遞代理。

這使您可以讓進程成爲被動模式並阻止接收,而您的程序的其餘部分停止執行任何操作。如果沒有一些任意的輪詢,肯定會讓你進入邪惡的黑色計劃問題沼澤。

的情況:每次查詢

到資源的新連接。如果每個查詢需要一個新的連接模式是相似的,但在這裏,你每次查詢產生新的進程,並它代表查詢本身在您的系統內。它阻止等待響應(超時),並且沒有其他事情對它有影響。

實際上,這是更簡單的模型,因爲您不必清理過去可能超時的請求永遠不會返回的列表,也不必與通過發送的一組暫停超時消息進行交互erlang:send_after/3,並且您將您的抽象移近您的問題的實際模型

你不知道什麼時候這些查詢會返回,並且會導致一些潛在的混淆 - 因此將每個實際查詢建模爲有生命的事物是切斷邏輯混亂的最佳方式。

無論哪種方式,模型自然的問題:作爲一個併發的,非同步系統

在任何情況下,但是,你要真正做到輪詢你在Python或C或任何會的方式。這是一個併發的問題,因此建模將爲您提供更多的邏輯自由度,並且更有可能產生正確的解決方案,從而導致出現奇怪的情況。

+0

我的情況是A.是的,我聽說你沒有通過垃圾郵件,而且我確認了,但我仍然沒有得到回覆。我認爲問題的一部分是我首先使用阻塞的zeroMQ接收套接字。我需要做的是「消除並忘記」這個查詢,並且「希望」能夠反駁,如果我沒有在一定的時間內得到答案,我就會放棄一條錯誤消息。這需要一個ZeroMQ pub-sub套接字設置,而不是我使用的代理路由器模式。 從那裏我一定會去與「外部服務代表」的過程模式。 –

+1

@ThomasBrowne我不確定我是否會在這種情況下使用ZeroMQ - Erlang中的套接字處理非常簡單,因此設置一個標記的消息以便將來發送以指示超時。如果我確實使用了ZeroMQ,我可能會在第二種情況下將系統內部建模,其中每個查詢*都是您爲了跟蹤其狀態而產生的一個進程,並且它自己接收一個或多個回覆,但擁有它將其查詢內容發送給ZeroMQ處理程序(或爲其自身創建訂閱),就好像ZeroMQ進程本身就是外部資源一樣。 – zxq9