2017-02-21 67 views
1

我試圖讓多處理ServerApp在Windows上工作。我想這個問題是缺少os.fork()功能,所以我必須通過socket不知何故這是不可pickleable(?!)。Python3 Windows多處理傳遞套接字來處理

我已經看到這可能是reduce_handlerebuild_handlemultiprocessing.reductionhere所示,但這些方法在Python 3(?!)中不可用。雖然我有可用的duplicatesteal_handle可用我找不到一個示例如何使用它們,或者我是否需要它們。

此外,我想知道logging是否會在創建新流程時出現問題?

這裏是我的ServerApp樣本:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import Queue 
from multiprocessing import Process 
from sys import stdout 
from time import sleep 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 


    def conn_handler(self, connection, address, buffer): 

     self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 


       if command == 'Something': 
        connection.sendall(command_response) 
       else: 
        print(':(') 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      self.client_buffers.remove(buffer) 
      self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1]) 


    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 


    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and add it o list of all client buffers 
      buffer = Queue() 
      self.client_buffers.append(buffer) 

      process = Process(target=self.conn_handler, args=(conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process has its own reference. 
      conn.close() 


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 


    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection 
     self.listener.daemon = True 
     self.listener.start() 
+0

你寫了一些代碼,但看不到任何'協議'定義。如果已經接受,則無法定義任何接受規則(什麼是過濾器?)。 – dsgdfg

+0

@dsgdfg不知道我是否正確,但每個連接都應該被單獨的進程接受和處理。 – sstevan

回答

3

允許連接酸洗(包括插座)爲python3,你應該使用mulitprocessing.allow_connection_pickling。它在ForkingPickler中註冊套接字減速器。例如:

import socket 
import multiprocessing as mp 
mp.allow_connection_pickling() 


def _test_connection(conn): 
    msg = conn.recv(2) 
    conn.send(msg) 
    conn.close() 
    print("ok") 

if __name__ == '__main__': 
    server, client = socket.socketpair() 

    p = mp.Process(target=_test_connection, args=(server,)) 
    p.start() 

    client.settimeout(5) 

    msg = b'42' 
    client.send(msg) 
    assert client.recv(2) == msg 

    p.join() 
    assert p.exitcode == 0 

    client.close() 
    server.close() 

我還注意到,您有unrealted到socket酸洗其他一些問題。

  • 使用時self.conn_handler作爲目標時,多會嘗試以酸洗整個對象self。這是一個問題,因爲您的對象包含一些不能被酸洗的Thread。因此,您應該從關閉目標函數中刪除self。可以通過使用@staticmethod修飾器並刪除函數中所有提及的self來完成。

  • 此外,logging模塊不處理多個進程。基本上,啓動的Process的所有日誌都將丟失您的當前代碼。要解決這個問題,您可以在啓動第二個Process(在conn_handler的開頭)或使用multiprocessing日誌工具啓動新的logging

這可以讓這樣的事情:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import util, get_context 
from sys import stdout 
from time import sleep 

util.log_to_stderr(20) 
ctx = get_context("spawn") 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', 
       buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 

    @staticmethod 
    def conn_handler(id, connection, address, buffer): 

     print("test") 
     util.info("[%d] - Connection from %s:%d", id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       # Check for client commands 
       readable, writable, exceptional = select([connection], [], [], 
                 0) 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 

       if command == 'Something': 
        connection.sendall(b"Coucouc") 
        break 
       else: 
        print(':(') 
       sleep(.1) 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      util.info("[%d] - Connection from %s:%d has been closed.", id, 
        address[0], address[1]) 
      print("Close") 

    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 

    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, 
          self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process 
      # to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and 
      # add it o list of all client buffers 
      buffer = ctx.Queue() 
      self.client_buffers.append(buffer) 

      process = ctx.Process(target=self.conn_handler, 
           args=(self.id, conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process 
      # has its own reference. 
      conn.close() 

    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     # Run acceptor thread to handle new connection 
     self.listener = Thread(target=self.acceptor) 
     self.listener.daemon = True 
     self.listener.start() 

     self.listener.join() 


def main(): 
    app = ServerApp(0) 
    app.run() 


if __name__ == '__main__': 
    main() 

我只測試了它在Unix和python3.6但正如我在Windows中使用的菌種方面, which should behave like the Process`它不應該有太多的行爲不同。

+0

抱歉,延遲。我已經測試了'allow_connection_pickling',但它沒有效果。但是,我注意到我收到兩個不同的錯誤(運行相同的代碼兩次)。這是[第一個](http://pastebin.com/zyS2Sbtm),這裏是[第二個](http://pastebin.com/7FQ1nvxN)。當我沒有將偵聽器分配爲ServerApp屬性(只是'listener'而不是'self.listener')時,我沒有錯誤,但處理程序進程也不會執行。 – sstevan

+0

這不是'套接字酸洗問題。如果你閱讀這些錯誤,它不會醃製一個'_thread.Lock'和一些'io'對象。我會說這與整個'ServerApp'對象的酸洗有關,當你使用實例方法來啓動新的'Process'時需要。您應該爲'conn_handler'使用'@ staticmethod'裝飾器,或者將其從類中移除。這也是一個很好的做法,因爲醃製整個對象是不安全的(例如,如果它處理一些密碼)。另外,請嘗試提供一些重現錯誤的基本腳本以允許測試。 –

+0

另外,作爲一個很好的例子,你應該使用'multiprocessing.utils.log_to_stderr'和'multiprocessing.utils.debug/info'來獲得與你的'Process'相一致的日誌記錄。如果您需要使用自定義日誌記錄,則應該在目標函數的開始處啓動它,因爲'logging'只能用於一個'Process'。 –