2011-08-16 33 views
8

我想動態創建多個Process es,其中每個實例都有一個來自其他實例的傳入消息的隊列,並且每個實例也可以創建新實例。所以我們最終得到一個互相發送的進程網絡。每個實例都可以發送給其他人。如何在Python多重處理中動態創建每進程隊列

下面的代碼會做我想做的:它採用了Manager.dict()存儲隊列,確保更新傳播和Lock()保護寫訪問的隊列。但是,當添加新的隊列時,它會拋出"RuntimeError: Queue objects should only be shared between processes through inheritance"

問題是,在啓動時,我們不知道最終會需要多少隊列,所以我們必須動態創建它們。但由於除了施工時我們不能分享隊列,所以我不知道該怎麼做。

我知道,一個可能性是使queues一個全局變量,而不是管理一個傳入到__init__:這個問題的話,我的理解是,增加的queues變量將不會傳播到其他進程。

編輯我正在進化算法。 EA是一種機器學習技術。一個EA模擬一個「人口」,它隨着適者生存,交叉和變異而演變。在平行 EA,因爲在這裏,我們也有人口之間的遷移,對應於進程間通信。島嶼也可以產生新的島嶼,所以我們需要一種在動態創建的進程之間發送消息的方式。

import random, time 
from multiprocessing import Process, Queue, Lock, Manager, current_process 
try: 
    from queue import Empty as EmptyQueueException 
except ImportError: 
    from Queue import Empty as EmptyQueueException 

class MyProcess(Process): 
    def __init__(self, queues, lock): 
     super(MyProcess, self).__init__(target=lambda x: self.run(x), 
            args=tuple()) 
     self.queues = queues 
     self.lock = lock 
     # acquire lock and add a new queue for this process 
     with self.lock: 
      self.id = len(list(self.queues.keys())) 
      self.queues[self.id] = Queue() 

    def run(self): 
     while len(list(self.queues.keys())) < 10: 

      # make a new process 
      new = MyProcess(self.lock) 
      new.start() 

      # send a message to a random process 
      dest_key = random.choice(list(self.queues.keys())) 
      dest = self.queues[dest_key] 
      dest.put("hello to %s from %s" % (dest_key, self.id)) 

      # receive messages 
      message = True 
      while message: 
       try: 
        message = self.queues[self.id].get(False) # don't block 
        print("%s received: %s" % (self.id, message)) 
       except EmptyQueueException: 
        break 

      # what queues does this process know about? 
      print("%d: I know of %s" % 
        (self.id, " ".join([str(id) for id in self.queues.keys()]))) 

      time.sleep(1) 

if __name__ == "__main__": 
    # Construct MyProcess with a Manager.dict for storing the queues 
    # and a lock to protect write access. Start. 
    MyProcess(Manager().dict(), Lock()).start() 

回答

3

我不完全確定你的用例實際上在這裏。也許如果你詳細闡述一下爲什麼你想讓每個進程動態地產生一個連接隊列的孩子,它會更清楚地知道在這種情況下什麼是正確的解決方案。

無論如何,現在的問題似乎是,現在還沒有一種真正的好方法來動態創建帶有Multiprocessing的管道或隊列。

我認爲,如果您願意在每個流程中產生線程,您可以使用來回通信。我沒有產生線程,而是採用了使用網絡套接字的方法,並選擇在線程之間進行通信。

動態進程產卵和網絡套接字可能仍然是片狀的,這取決於產卵/分叉新進程時multiprocessing如何清理文件描述符,並且您的解決方案很可能更容易在* nix衍生物上工作。如果您擔心套接字開銷,則可以使用unix域套接字來增強輕量級功能,但需要增加運行多個工作機上節點的複雜性。

無論如何,這裏是一個使用網絡套接字和全局進程列表來實現這個目標的例子,因爲我無法找到一個好方法來讓multiprocessing這樣做。

import collections 
import multiprocessing 
import random 
import select 
import socket 
import time 


class MessagePassingProcess(multiprocessing.Process): 
    def __init__(self, id_, processes): 
     self.id = id_ 
     self.processes = processes 
     self.queue = collections.deque() 
     super(MessagePassingProcess, self).__init__() 

    def run(self): 
     print "Running" 
     inputs = [] 
     outputs = [] 
     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     address = self.processes[self.id]["address"] 
     print "Process %s binding to %s"%(self.id, address) 
     server.bind(address) 
     server.listen(5) 
     inputs.append(server) 
     process = self.processes[self.id] 
     process["listening"] = True 
     self.processes[self.id] = process 
     print "Process %s now listening!(%s)"%(self.id, process) 
     while inputs: 
      readable, writable, exceptional = select.select(inputs, 
                  outputs, 
                  inputs, 
                  0.1) 
      for sock in readable: 
       print "Process %s has a readable scoket: %s"%(self.id, 
                   sock) 
       if sock is server: 
        print "Process %s has a readable server scoket: %s"%(self.id, 
                   sock) 
        conn, addr = sock.accept() 
        conn.setblocking(0) 
        inputs.append(conn) 
       else: 
        data = sock.recv(1024) 
        if data: 
         self.queue.append(data) 
         print "non server readable socket with data" 
        else: 
         inputs.remove(sock) 
         sock.close() 
         print "non server readable socket with no data" 

      for sock in exceptional: 
       print "exception occured on socket %s"%(sock) 
       inputs.remove(sock) 
       sock.close() 

      while len(self.queue) >= 1: 
       print "Received:", self.queue.pop() 

      # send a message to a random process: 
      random_id = random.choice(list(self.processes.keys())) 
      print "%s Attempting to send message to %s"%(self.id, random_id) 
      random_process = self.processes[random_id] 
      print "random_process:", random_process 
      if random_process["listening"]: 
       random_address = random_process["address"] 
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
       try: 
        s.connect(random_address) 
       except socket.error: 
        print "%s failed to send to %s"%(self.id, random_id) 
       else: 
        s.send("Hello World!")      
       finally: 
        s.close() 

      time.sleep(1) 

if __name__=="__main__": 
    print "hostname:", socket.getfqdn() 
    print dir(multiprocessing) 
    manager = multiprocessing.Manager() 
    processes = manager.dict() 
    joinable = [] 
    for n in xrange(multiprocessing.cpu_count()): 
     mpp = MessagePassingProcess(n, processes) 
     processes[n] = {"id":n, 
         "address":("127.0.0.1",7000+n), 
         "listening":False, 
         } 
     print "processes[%s] = %s"%(n, processes[n]) 
     mpp.start() 
     joinable.append(mpp) 
    for process in joinable: 
     process.join() 

隨着波蘭和測試的愛,因爲這似乎一樣,如果它在標準庫是可用一些人會使用,這可能是一個合乎邏輯的延伸到multiprocessing.Process和/或multiprocessing.Pool。創建一個使用套接字可被其他隊列發現的DynamicQueue類也是合理的。

無論如何,希望它有幫助。如果你想出一個更好的方法來完成這項工作,請更新。

+0

哇,這真的很有用,謝謝。我已經在用例上添加了一些關於原始問題的信息。隊列的優點是不必考慮緩衝區,酸洗,不完整的「recv」等等。但是現在我可以看到,連接隊列不是必需的:套接字可以很好地工作,只需要一些額外的工作。我把你的代碼和改變了它:現在可以通過pickle發送對象,可以創建新的進程,並且每個新進程現在都會將*自身*添加到字典中。我將添加我的代碼作爲一個單獨的答案。 – jmmcd

+0

我真的很高興能幫到你。 – stderr

3

此代碼基於接受的答案。它在Python 3中,因爲OSX Snow Leopard在多處理內容的某些用途上存在段錯誤。

#!/usr/bin/env python3 

import collections 
from multiprocessing import Process, Manager, Lock, cpu_count 
import random 
import select 
import socket 
import time 
import pickle 

class Message: 
    def __init__(self, origin): 
     self.type = "long_msg" 
     self.data = "X" * 3000 
     self.origin = origin 
    def __str__(self): 
     return "%s %d" % (self.type, self.origin) 

class MessagePassingProcess(Process): 
    def __init__(self, processes, lock): 
     self.lock = lock 
     self.processes = processes 
     with self.lock: 
      self.id = len(list(processes.keys())) 
      process_dict = {"id": self.id, 
          "address": ("127.0.0.1", 7000 + self.id), 
          "listening": False 
          } 
      self.processes[self.id] = process_dict 
     print("new process: processes[%s] = %s" % (self.id, processes[self.id])) 
     self.queue = collections.deque() 
     super(MessagePassingProcess, self).__init__() 

    def run(self): 
     print("Running") 
     self.processes[self.id]["joinable"] = True 
     inputs = [] 
     outputs = [] 
     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     address = self.processes[self.id]["address"] 
     print("Process %s binding to %s" % (self.id, address)) 
     server.bind(address) 
     server.listen(5) 
     inputs.append(server) 
     process = self.processes[self.id] 
     process["listening"] = True 
     self.processes[self.id] = process 
     print("Process %s now listening!(%s)" % (self.id, process)) 
     while inputs and len(list(self.processes.keys())) < 10: 
      readable, writable, exceptional = select.select(inputs, 
                  outputs, 
                  inputs, 
                  0.1) 
      # read incoming messages 
      for sock in readable: 
       print("Process %s has a readable socket: %s" % (self.id, sock)) 
       if sock is server: 
        print("Process %s has a readable server socket: %s" % 
          (self.id, sock)) 
        conn, addr = sock.accept() 
        conn.setblocking(0) 
        inputs.append(conn) 
       else: 
        data = True 
        item = bytes() # empty bytes object, to be added to 
        recvs = 0 
        while data: 
         data = sock.recv(1024) 
         item += data 
         recvs += 1 
        if len(item): 
         self.queue.append(item) 
         print("non server readable socket: recvd %d bytes in %d parts" 
           % (len(item), recvs)) 
        else: 
         inputs.remove(sock) 
         sock.close() 
         print("non server readable socket: nothing to read") 

      for sock in exceptional: 
       print("exception occured on socket %s" % (sock)) 
       inputs.remove(sock) 
       sock.close() 

      while len(self.queue): 
       msg = pickle.loads(self.queue.pop()) 
       print("received:" + str(msg)) 

      # send a message to a random process: 
      random_id = random.choice(list(self.processes.keys())) 
      print("%s attempting to send message to %s" % (self.id, random_id)) 
      random_process = self.processes[random_id] 
      if random_process["listening"]: 
       random_address = random_process["address"] 
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
       try: 
        s.connect(random_address) 
       except socket.error: 
        print("%s failed to send to %s"%(self.id, random_id)) 
       else: 
        item = pickle.dumps(Message(self.id)) 
        print("sending a total of %d bytes" % len(item)) 
        s.sendall(item) 
       finally: 
        s.close() 

      # make a new process 
      if random.random() < 0.1: 
       mpp = MessagePassingProcess(self.processes, self.lock) 
       mpp.start() 
      else: 
       time.sleep(1.0) 
     print("process %d finished looping" % self.id) 


if __name__=="__main__": 
    manager = Manager() 
    processes = manager.dict() 
    lock = Lock() 
    # make just one process: it will make more 
    mpp = MessagePassingProcess(processes, lock) 
    mpp.start() 
    # this doesn't join on all the other processes created 
    # subsequently 
    mpp.join() 
1

提供了標準庫socketserver,以避免手動編程select()。在這個版本中,我們在一個單獨的線程中啓動一個套接字服務器,以便每個進程可以在其主循環中執行(當然,假裝做)計算。

#!/usr/bin/env python3 

# Each Node is an mp.Process. It opens a client-side socket to send a 
# message to another Node. Each Node listens using a separate thread 
# running a socketserver (so avoiding manual programming of select()), 
# which itself starts a new thread to handle each incoming connection. 
# The socketserver puts received messages on an mp.Queue, where they 
# are picked up by the Node for processing once per loop. This setup 
# allows the Node to do computation in its main loop. 

import multiprocessing as mp 
import threading, random, socket, socketserver, time, pickle, queue 

class Message: 
    def __init__(self, origin): 
     self.type = "long_message" 
     self.data = "X" * random.randint(0, 2000) 
     self.origin = origin 
    def __str__(self): 
     return "Message of type %s, length %d from %d" % (
      self.type, len(self.data), self.origin) 

class Node(mp.Process): 
    def __init__(self, nodes, lock): 
     super().__init__() 

     # Add this node to the Manager.dict of node descriptors. 
     # Write-access is protected by a Lock. 
     self.nodes = nodes 
     self.lock = lock 
     with self.lock: 
      self.id = len(list(nodes.keys())) 
      host = "127.0.0.1" 
      port = 7022 + self.id 
      node = {"id": self.id, "address": (host, port), "listening": False} 
      self.nodes[self.id] = node 
     print("new node: nodes[%s] = %s" % (self.id, nodes[self.id])) 

     # Set up socketserver. 

     # don't know why collections.deque or queue.Queue don't work here. 
     self.queue = mp.Queue() 

     # This MixIn usage is directly from the python.org 
     # socketserver docs 
     class ThreadedTCPServer(socketserver.ThreadingMixIn, 
           socketserver.TCPServer): 
      pass 
     class HandlerWithQueue(socketserver.BaseRequestHandler): 
      # Something of a hack: using class variables to give the 
      # Handler access to this Node-specific data 
      handler_queue = self.queue 
      handler_id = self.id 
      def handle(self): 
       # could receive data in multiple chunks, so loop and 
       # concatenate 
       item = bytes() 
       recvs = 0 
       data = True 
       if data: 
        data = self.request.recv(4096) 
        item += data 
        recvs += 1 
       if len(item): 
        # Receive a pickle here and put it straight on 
        # queue. Will be unpickled when taken off queue. 
        print("%d: socketserver received %d bytes in %d recv()s" 
          % (self.handler_id, len(item), recvs)) 
        self.handler_queue.put(item) 

     self.server = ThreadedTCPServer((host, port), HandlerWithQueue) 
     self.server_thread = threading.Thread(target=self.server.serve_forever) 
     self.server_thread.setDaemon(True) # Tell it to exit when Node exits. 
     self.server_thread.start() 
     print("%d: server loop running in thread %s" % 
       (self.id, self.server_thread.getName())) 

     # Now ready to receive 
     with self.lock: 
      # Careful: if we assign directly to 
      # self.nodes[self.id]["listening"], the new value *won't* 
      # be propagated to other Nodes by the Manager.dict. Have 
      # to use this hack to re-assign the Manager.dict key. 
      node = self.nodes[self.id] 
      node["listening"] = True 
      self.nodes[self.id] = node 

    def send(self): 
     # Find a destination. All listening nodes are eligible except self. 
     dests = [node for node in self.nodes.values() 
       if node["id"] != self.id and node["listening"]] 
     if len(dests) < 1: 
      print("%d: no node to send to" % self.id) 
      return 
     dest = random.choice(dests) 
     print("%d: sending to %s" % (self.id, dest["id"])) 

     # send 
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      s.connect(dest["address"]) 
     except socket.error: 
      print("%s: failed to send to %s" % (self.id, dest["id"])) 
     else: 
      item = pickle.dumps(Message(self.id)) 
      s.sendall(item) 
     finally: 
      s.close() 

    # Check our queue for incoming messages. 
    def receive(self): 
     while True: 
      try: 
       message = pickle.loads(self.queue.get(False)) 
       print("%d: received %s" % (self.id, str(message))) 
      except queue.Empty: 
       break 

    def run(self): 
     print("%d: in run()" % self.id) 
     # Main loop. Loop until at least 10 Nodes exist. Because of 
     # parallel processing we might get a few more 
     while len(list(self.nodes.keys())) < 10: 
      time.sleep(random.random() * 0.5) # simulate heavy computation 
      self.send() 
      time.sleep(random.random() * 0.5) # simulate heavy computation 
      self.receive() 
      # maybe make a new node 
      if random.random() < 0.1: 
       new = Node(self.nodes, self.lock) 
       new.start() 
     # Seems natural to call server_thread.shutdown() here, but it 
     # hangs. But since we've set the thread to be a daemon, it 
     # will exit when this process does. 
     print("%d: finished" % self.id) 

if __name__=="__main__": 
    manager = mp.Manager() 
    nodes = manager.dict() 
    lock = mp.Lock() 
    # make just one node: it will make more 
    node0 = Node(nodes, lock) 
    node0.start() 
    # This doesn't join on all the other nodes created subsequently. 
    # But everything seems to work out ok. 
    node0.join() 
+0

請注意,select/socketserver是非常不同的模型:socketserver使用線程/分叉和阻塞套接字,而select使用帶有非阻塞套接字的單個進程。 – stderr

+0

@Mike好的,謝謝。我希望我的理解是正確的:從監聽者的角度來看,在進程中使用非阻塞套接字與在專用進程/線程中阻塞套接字的效果大致相同:在這兩種情況下,消息最終都會在每次迭代中處理一次。然而,從發送者的角度來看,對一個* blocking *套接字發送一個「即發即忘」發送更容易,因爲它是「保證的」(實際上,沒有保證)是在監聽而不是計算在其循環的其他部分。 – jmmcd