2016-10-04 82 views
3

假設我有一個主進程將並行處理的數據分開。假設有1000個數據塊和100個節點來運行計算。ZeroMQ:許多工人和一個主人的負載均衡

是否有某種方法可以使REQ/REP保持所有員工的工作繁忙?我試過在指南中使用負載平衡器模式,但使用單個客戶端時,sock.recv()將阻塞,直到它收到工作人員的響應。

以下是代碼,稍微修改了zmq指南中的負載均衡器。啓動一名客戶,10名員工,以及中間的負載均衡器/經紀人。我怎樣才能讓所有那些同時工作的員工?

from __future__ import print_function 
from multiprocessing import Process 
import zmq 
import time 
import uuid 
import random 

def client_task(): 
    """Basic request-reply client using REQ socket.""" 
    socket = zmq.Context().socket(zmq.REQ) 
    socket.identity = str(uuid.uuid4()) 
    socket.connect("ipc://frontend.ipc") 
    # Send request, get reply 
    for i in range(100): 
     print("SENDING: ", i) 
     socket.send('WORK') 
     msg = socket.recv() 
     print(msg) 

def worker_task(): 
    """Worker task, using a REQ socket to do load-balancing.""" 
    socket = zmq.Context().socket(zmq.REQ) 
    socket.identity = str(uuid.uuid4()) 
    socket.connect("ipc://backend.ipc") 
    # Tell broker we're ready for work 
    socket.send(b"READY") 
    while True: 
     address, empty, request = socket.recv_multipart() 
     time.sleep(random.randint(1, 4)) 
     socket.send_multipart([address, b"", b"OK : " + str(socket.identity)]) 


def broker(): 
    context = zmq.Context() 
    frontend = context.socket(zmq.ROUTER) 
    frontend.bind("ipc://frontend.ipc") 
    backend = context.socket(zmq.ROUTER) 
    backend.bind("ipc://backend.ipc") 
    # Initialize main loop state 
    workers = [] 
    poller = zmq.Poller() 
    # Only poll for requests from backend until workers are available 
    poller.register(backend, zmq.POLLIN) 

    while True: 
     sockets = dict(poller.poll()) 
     if backend in sockets: 
      # Handle worker activity on the backend 
      request = backend.recv_multipart() 
      worker, empty, client = request[:3] 
      if not workers: 
       # Poll for clients now that a worker is available 
       poller.register(frontend, zmq.POLLIN) 
      workers.append(worker) 
      if client != b"READY" and len(request) > 3: 
       # If client reply, send rest back to frontend 
       empty, reply = request[3:] 
       frontend.send_multipart([client, b"", reply]) 

     if frontend in sockets: 
      # Get next client request, route to last-used worker 
      client, empty, request = frontend.recv_multipart() 
      worker = workers.pop(0) 
      backend.send_multipart([worker, b"", client, b"", request]) 
      if not workers: 
       # Don't poll clients if no workers are available 
       poller.unregister(frontend) 

    # Clean up 
    backend.close() 
    frontend.close() 
    context.term() 

def main(): 
    NUM_CLIENTS = 1 
    NUM_WORKERS = 10 
    # Start background tasks 
    def start(task, *args): 
     process = Process(target=task, args=args) 
     process.start() 
    start(broker) 

    for i in range(NUM_CLIENTS): 
     start(client_task) 

    for i in range(NUM_WORKERS): 
     start(worker_task) 


    # Process(target=broker).start() 




if __name__ == "__main__": 
    main() 

回答

1

我想有不同的方法來做到這一點:

- 你可以,例如,使用threading模塊從單一的客戶端啓動所有請求,喜歡的東西:

result_list = [] # Add the result to a list for the example 
rlock = threading.RLock() 

def client_thread(client_url, request, i): 
    context = zmq.Context.instance() 
    socket = context.socket(zmq.REQ) 

    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) 
    socket.connect(client_url) 

    socket.send(request.encode()) 
    reply = socket.recv() 

    with rlock: 
     result_list.append((i, reply)) 
    return 

def client_task(): 
    # tasks = list with all your tasks 
    url_client = "ipc://frontend.ipc" 
    threads = [] 
    for i in range(len(tasks)): 
     thread = threading.Thread(target=client_thread, 
            args=(url_client, tasks[i], i,)) 
     thread.start() 
     threads.append(thread) 

-you可以採取事件化文庫的好處等asyncio(有一個子模塊zmq.asyncio和其他庫aiozmq,最後一個提供了更高的抽象級別)。在這種情況下,您將按順序將您的請求發送給工作人員,但不會阻止每個響應(並且不會阻止主循環忙碌),並在返回到主循環時獲得結果。這可能是這樣的:

import asyncio 
import zmq.asyncio 

async def client_async(request, context, i, client_url): 
    """Basic client sending a request (REQ) to a ROUTER (the broker)""" 
    socket = context.socket(zmq.REQ) 
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) 
    socket.connect(client_url) 
    await socket.send(request.encode()) 
    reply = await socket.recv() 
    socket.close() 
    return reply 


async def run(loop): 
    # tasks = list full of tasks 
    url_client = "ipc://frontend.ipc" 
    asyncio_tasks = [] 
    ctx = zmq.asyncio.Context() 
    for i in range(len(tasks)): 
     task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client)) 
     asyncio_tasks.append(task) 

    responses = await asyncio.gather(*asyncio_tasks) 
    return responses 

zmq.asyncio.install() 
loop = asyncio.get_event_loop() 
results = loop.run_until_complete(run(loop)) 

我沒有測試論文兩個片段,但他們都從碼來(與修改,以適應問題)我已經不是你的問題了類似的配置中使用ZMQ。

+0

我最終在zmq指南中使用了「asynchronous」req/rep模式,最終解決了這個問題。中間代理的輪詢比較理想,但對我的用例來說已經足夠了 – reptilicus