在Python我有使用「輪詢」對象,其輪詢阻斷等待消息插座和放開毫秒的指定次數後的選項(在殼體下面,1000,在同時真塊):如何在Erlang或Elixir中使用ZeroMQ實現非阻塞套接字?
import zmq
# now open up all the sockets
context = zmq.Context()
outsub = context.socket(zmq.SUB)
outsub.bind("tcp://" + myip + ":" + str(args.outsubport))
outsub.setsockopt(zmq.SUBSCRIBE, b"")
inreq = context.socket(zmq.ROUTER)
inreq.bind("tcp://" + myip + ":" + str(args.inreqport))
outref = context.socket(zmq.ROUTER)
outref.bind("tcp://" + myip + ":" + str(args.outrefport))
req = context.socket(zmq.ROUTER)
req.bind("tcp://" + myip + ":" + str(args.reqport))
repub = context.socket(zmq.PUB)
repub.bind("tcp://" + myip + ":" + str(args.repubport))
# sort out the poller
poller = zmq.Poller()
poller.register(inreq, zmq.POLLIN)
poller.register(outsub, zmq.POLLIN)
poller.register(outref, zmq.POLLIN)
poller.register(req, zmq.POLLIN)
# UDP socket setup for broadcasting this server's address
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# housekeeping variables
pulsecheck = datetime.utcnow() + timedelta(seconds = 1)
alivelist = dict()
pulsetimeout = 5
while True:
polls = dict(poller.poll(1000))
if inreq in polls:
msg = inreq.recv_multipart()
if msg[1] == b"pulse": # handle pluse
ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode())
if not msg[0] in alivelist.keys():
handlechange(msg[0])
alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout)
if outsub in polls:
msgin = outsub.recv_multipart()[0]
repub.send(msgin) # republish
msg = unpacker(msgin)
if isinstance(msg, dict):
valu = msg.get("value")
print(".", end = "", flush = True)
else:
ansi("green", False, textout = msg)
if req in polls:
msg = req.recv_multipart()
valmsg = validate_request(msg)
if not valmsg[0]:
ansi("red", True); print(valmsg[1]); ansi()
elif len(alivelist) > 0:
targetnode = random.choice(list(alivelist.keys()))
inreq.send_multipart([targetnode, packer(valmsg[1])])
ansi("blue", True, textout = "sent to " + targetnode.decode())
else:
ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO")
if outref in polls:
msg = outref.recv_multipart()
destinataire, correlid = msg[1].split(b"/")
req.send_multipart([destinataire, correlid, msg[2]])
我想在Elixir(或Erlang)中實現類似的東西,但我的首選本機庫chumak似乎沒有實現輪詢。我如何在Erlang/Elixir中實現非阻塞接收,最好是使用Chumak,但如果需要,我會移至另一個Erlang zeroMQ庫?我的套接字模式首選是路由器發送,經銷商收到。
編輯
我的使用情況如下。我有第三方金融服務,根據請求提供數據,異步回答。因此,您可以發送多個請求,並在未指定的時間段後收到回覆,但不一定按照您發送給他們的順序回覆。
所以我需要將這個服務連接到Erlang(實際上是Elixir)和ZeroMQ似乎是一個很好的契合。連接(通過Phoenix)到Erlang/Elixir的多個用戶將發送請求,並且我需要將這些請求傳遞給此服務。
如果其中一個請求發生錯誤,或者第三方服務存在某種問題,則會出現問題。我將阻止 - 等待響應,然後無法爲鳳凰城的新請求提供服務。
基本上我想經常聽新的請求,發送它們,但如果一個請求沒有產生響應,我會有比請求少一個響應,這會導致永久等待。
我明白,如果我分別發送請求,那麼好的會產生響應,所以即使隨着時間的推移,我發送的請求和接收到的響應之間的數字差別也不會很大。也許設計理念是我不應該擔心這個?或者我應該嘗試追蹤一對一的請求響應並以某種方式暫停非響應?這是一種有效的設計模式嗎?
使用一個進程擁有套接字,阻止接收和*反應*接收消息。使用另一個流程來完成其他任務(可能按照計劃)。 *你不想輪詢*,你想建立一個邏輯系統'鏈接進程,執行一些任務的合作*沒有輪詢*。這是Erlang併發方法的重點。如果你解釋你試圖達到的整體效果,它會有很大幫助 - 因爲這幾乎肯定是一個X-Y問題。有一個[Erlang/OTP聊天室](https://chat.stackoverflow.com/rooms/75358/erlang-otp)。 – zxq9
@ zxq9好吧我已經更新了這個問題。它更有意義嗎?我應該把它搬到Erlang房間嗎?問題是我更喜歡Elixir學習者(開始學習,但知道大部分基礎知識,包括GenServer和應用程序)。 –
@ zxq9我想我的問題是,如果Erlang進程在永遠不會接收的接收上阻塞,它將無法處理來自Phoenix的新的*傳入*請求。或者我的錯誤是我應該在另一個過程中處理所述請求? –