2016-12-14 39 views
7

我想實現一個tcp'echo服務器'。 簡單的東西:Python的多重處理和網絡在Windows上

  1. 客戶端發送消息到服務器。
  2. 服務器接收到該消息
  3. 服務器消息轉換爲大寫
  4. 服務器發送修改後的消息到客戶端
  5. 客戶打印的響應。

它運行良好,所以我決定並行服務器;使其能夠在時間處理多個客戶端。 由於大多數Python解釋器都有GIL,所以多線程不會削減它。 我不得不使用多處理器......而男孩,這是事情進展緩慢的地方。

我使用Windows 10 x64和WinPython與Python 3.5.2 x64相適應。

我的想法是創建一個套接字,初始化它(綁定和偵聽),創建子進程並將套接字傳遞給子節點。 但是對於我的愛......我不能做這項工作,我的子流程幾乎立即死亡。 最初,我有一些問題'酸洗'的插座... 所以我GOOGLE了一下,並認爲這是問題。 所以我嘗試通過一個多處理隊列,通過管道傳遞我的套接字,我最後一次嘗試是'forkpickling',並在創建處理過程中將它作爲字節對象傳遞。 沒有用。

有人可以在這裏擺脫一些光? 告訴我什麼是錯的? 也許整個想法(共享套接字)是不好的...如果是這樣,請告訴我如何實現我的最初目標:使我的服務器能夠一次處理多個客戶端(在Windows上)(不要告訴我關於線程,我們都知道python的線程不會削減它¬¬)

它也值得注意的是,沒有文件是由調試功能創建的。 我相信,沒有任何程序能夠運行足夠長的時間來運行它。

我的服務器代碼的典型輸出(只運行之間的區別是進程號):

Server is running... 
Degree of parallelism: 4 
Socket created. 
Socket bount to: ('', 0) 
Process 3604 is alive: True 
Process 5188 is alive: True 
Process 6800 is alive: True 
Process 2844 is alive: True 

Press ctrl+c to kill all processes. 

Process 3604 is alive: False 
Process 3604 exit code: 1 
Process 5188 is alive: False 
Process 5188 exit code: 1 
Process 6800 is alive: False 
Process 6800 exit code: 1 
Process 2844 is alive: False 
Process 2844 exit code: 1 
The children died... 
Why god? 
WHYYyyyyy!!?!?!? 

服務器代碼:

# Imports 
import socket 
import packet 
import sys 
import os 
from time import sleep 
import multiprocessing as mp 
import pickle 
import io 

# Constants 
DEGREE_OF_PARALLELISM = 4 
DEFAULT_HOST = "" 
DEFAULT_PORT = 0 

def _parse_cmd_line_args(): 
    arguments = sys.argv 
    if len(arguments) == 1: 
     return DEFAULT_HOST, DEFAULT_PORT 
    else: 
     raise NotImplemented() 

def debug(data): 
    pid = os.getpid() 
    with open('C:\\Users\\Trauer\\Desktop\\debug\\'+str(pid)+'.txt', mode='a', 
       encoding='utf8') as file: 
     file.write(str(data) + '\n') 

def handle_connection(client): 
    client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES) 
    debug('received data from client: ' + str(len(client_data))) 
    response = client_data.upper() 
    client.send(response)  
    debug('sent data from client: ' + str(response)) 

def listen(picklez):  
    debug('started listen function') 

    pid = os.getpid() 
    server_socket = pickle.loads(picklez) 
    debug('acquired socket') 

    while True: 
     debug('Sub process {0} is waiting for connection...'.format(str(pid))) 

     client, address = server_socket.accept() 
     debug('Sub process {0} accepted connection {1}'.format(str(pid), 
       str(client))) 

     handle_connection(client)   
     client.close() 
     debug('Sub process {0} finished handling connection {1}'. 
       format(str(pid),str(client))) 

if __name__ == "__main__":  
# Since most python interpreters have a GIL, multithreading won't cut 
# it... Oughta bust out some process, yo! 
    host_port = _parse_cmd_line_args() 
    print('Server is running...') 
    print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM)) 

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    print('Socket created.') 

    server_socket.bind(host_port) 
    server_socket.listen(DEGREE_OF_PARALLELISM) 
    print('Socket bount to: ' + str(host_port))   

    buffer = io.BytesIO() 
    mp.reduction.ForkingPickler(buffer).dump(server_socket) 
    picklez = buffer.getvalue() 

    children = [] 
    for i in range(DEGREE_OF_PARALLELISM):   
     child_process = mp.Process(target=listen, args=(picklez,)) 
     child_process.daemon = True 
     child_process.start() 
     children.append(child_process) 

     while not child_process.pid: 
      sleep(.25) 

     print('Process {0} is alive: {1}'.format(str(child_process.pid), 
       str(child_process.is_alive())))  
    print()  

    kids_are_alive = True 
    while kids_are_alive: 
     print('Press ctrl+c to kill all processes.\n') 
     sleep(1) 

     exit_codes = [] 
     for child_process in children: 
      print('Process {0} is alive: {1}'.format(str(child_process.pid), 
       str(child_process.is_alive()))) 
      print('Process {0} exit code: {1}'.format(str(child_process.pid), 
       str(child_process.exitcode))) 
      exit_codes.append(child_process.exitcode) 

     if all(exit_codes): 
      # Why do they die so young? :(
      print('The children died...') 
      print('Why god?') 
      print('WHYYyyyyy!!?!?!?') 
      kids_are_alive = False 

編輯:固定的「聽」的簽名。我的程序仍然會立即死亡。

edit2:用戶cmidi指出,這段代碼在Linux上工作;所以我的問題是:我如何'在Windows上做這項工作'?

+1

多處理自動處理將子套接字傳遞給子進程。 Windows實現使用套接字'share'方法和'fromshare'函數。 – eryksun

+1

在主進程中調用'accept()',並使用共享的'multiprocessing.Queue'將生成的'(conn,addr)'元組傳遞給worker。 – eryksun

+0

沒有變化,eryksun。我的過程仍然瞬間消失。 – Trauer

回答

3

您可以直接將套接字傳遞給子進程。多註冊了這個減少,爲此,Windows實現使用下列DupSocket類從multiprocessing.resource_sharer

class DupSocket(object): 
    '''Picklable wrapper for a socket.''' 
    def __init__(self, sock): 
     new_sock = sock.dup() 
     def send(conn, pid): 
      share = new_sock.share(pid) 
      conn.send_bytes(share) 
     self._id = _resource_sharer.register(send, new_sock.close) 

    def detach(self): 
     '''Get the socket. This should only be called once.''' 
     with _resource_sharer.get_connection(self._id) as conn: 
      share = conn.recv_bytes() 
      return socket.fromshare(share) 

這調用Windows套接字share方法,它返回從調用WSADuplicateSocket協議信息緩衝區。它向資源共享者註冊,通過連接將此緩衝區發送給子進程。小孩又調用detach,它接收協議信息緩衝區並通過socket.fromshare重建套接字。

它不直接關係到你的問題,但我建議你重新設計服務器改爲調用accept在主要的過程,這是(在Python的socketserver.ForkingTCPServer模塊例如)這通常做的方式。將生成的(conn, address)元組通過multiprocessing.Queue傳遞給第一個可用的工作人員,該進程由進程池中的所有工作人員共享。或考慮使用multiprocessing.Poolapply_async

+0

我明白了。 [this](http://pastebin.com/uRjCBuaD)方法呢,這是合法的嗎? – Trauer

+0

我還沒有檢查過監聽套接字被複制到另一個進程時如何處理積壓。如果每個副本都有自己的積壓,那麼它可能不是你想要的。 – eryksun

+0

我明白了。最後一個問題,我不再打擾你了,嘿嘿。你知道我在哪裏可以找到關於這方面的一些文件? – Trauer

0

def listen()目標/啓動你的子進程不帶任何參數,但您提供系列化插座作爲參數args=(picklez,)子進程,這將造成立即的子進程,並退出異常。

TypeError: listen() takes no arguments (1 given) 

def listen(picklez)應該解決的問題,這將提供一個參數傳遞給你的子進程的目標。

+0

事實上,最初發布的功能並不完整。我在這裏發佈了不完整/不正確的代碼。 正確的簽名是listen(picklez)。我的觀點是:可悲的是,這不是問題;原始代碼有正確的簽名,但我的孩子進程仍然立即死亡,但謝謝你的答案。 – Trauer

+0

嚴格地說'accept'應該被鎖定在不同的進程之間訪問,並且代碼似乎在linux環境下工作,即使沒有序列化套接字 – cmidi

+0

我對此感到害怕......我有一種感覺,這是一個Windows併發症。 我會將這些信息添加到我的問題中。謝謝你cmidi。 – Trauer