2017-12-18 90 views
1

我玩弄瞭解多線程處理一個客戶端的消息,所以我寫了下面的客戶機/服務器應用程序,其中,服務器發送命令給客戶端,客戶端檢查此命令,如果它等於到'a'它發送一個回覆到服務器。Python的多線程服務器可以在同一時間

在我創建了兩個插槽和一個線程的服務器代碼;第一個套接字將命令發送(發佈)到所有連接(訂閱)的客戶端。在線程第二插座等待來自客戶的任何答覆,但因爲線程執行一些阻塞的操作(如存儲客戶端在數據庫中發送的信息),它可以在同一時間,即使插座(REQ-REP處理一個客戶端套接字)可以同時接收多條消息。

server.py

import zmq 
import logging 
import threading 
import time 

logging.basicConfig(level=logging.DEBUG) 


class Server(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.pub_port = 7777 
     self.rep_port = 7778 

     self.pub_socket = None 
     self.rep_socket = None 
     self.interface = "*" 

    def bind_ports(self): 
     logging.debug("[bind_ports] binding the ports....") 
     self.pub_socket = self.context.socket(zmq.PUB) 
     pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port) 
     self.pub_socket.bind(pub_bind_str) 

     self.rep_socket = self.context.socket(zmq.REP) 
     rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port) 
     self.rep_socket.bind(rep_bind_str) 

    def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      logging.info("[received_data] data <{}>".format(flow)) 
      self.rep_socket.send(b"\x00") 
      self.blocking_op(cl_data) 

    def blocking_op(self, data): 
     time.sleep(1) # simulating some blocking operations e.g. storing info in a database 

    def push_instruction(self, cmd): 
     logging.debug("[push_inst] Sending the instruction <%s> to the clients...", 
     # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...", 
     cmd) 
     instruction = {"cmd": cmd} 
     self.pub_socket.send_json(instruction) 

    def create_thread(self): 
     thread = threading.Thread(target=self.received_info) 
     thread.daemon = True 
     thread.start() 
     logging.debug("[create_thread] Thread created <{}>".format(
                 thread.is_alive())) 

    def start_main_loop(self): 
     logging.debug("[start_main_loop] Loop started....") 
     self.bind_ports() 
     self.create_thread() 

     while True: 
      cmd = input("Enter your command: ") 
      self.push_instruction(cmd) 

if __name__ == "__main__": 
    Server().start_main_loop() 

client.py

import zmq 
import logging 
import random 
import time 

logging.basicConfig(level=logging.DEBUG) 

class Client(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.sub_socket = None 
     self.req_socket = None 

     self.pub_port = 7777 
     self.req_port = 7778 
     self.server_ip = 'localhost' 

     self.client_id = "" 

    def connect_to_server(self): 
     logging.debug("[conn_to_serv] Connecting to the server ....") 
     self.sub_socket = self.context.socket(zmq.SUB) 
     self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") 
     conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port) 
     self.sub_socket.connect(conn_str) 

     self.req_socket = self.context.socket(zmq.REQ) 
     req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) 
     self.req_socket.connect(req_conn_str) 

    def get_instruction(self): 
     inst = self.sub_socket.recv_json() 
     logging.debug("[get_inst] Server sent inst") 
     cmd = inst["cmd"] 
     return cmd 
    def send_flow(self, x, y): 
     flow = { 
      "client_id": self.client_id, 
      "x": x, 
      "y": y 
     } 
     self.req_socket.send_json(flow) 

    def start_main_loop(self): 
     logging.debug("starting the main loop ....") 
     self.client_id = input("What is your id: ") 
     self.connect_to_server() 

     while True: 
      inst = self.get_instruction() 
      logging.info("[Main_loop] inst<{}>".format(inst)) 
      if inst == "a": 
       # time.sleep(random.uniform(.6, 1.5)) 
       self.send_flow("xxx", "yyy") 
       self.req_socket.recv() 
       logging.debug("[main_loop] server received the flow") 

if __name__ == "__main__": 
    Client().start_main_loop() 

我將不勝感激,如果有人能幫助我提高了服務器,以便它可以在爲多個客戶的信息同時。

+0

如果你的反應處理塊或需要很長的時間,那麼一個好辦法是在你的'receive_info()'應對讀取,然後啓動一個線程來完成實際的處理。執行此線程需要花費的時間,但它不會阻止您的主循環。 – Hannu

回答

1

我是不是能夠運行您的代碼和測試,但如果你的問題是receive_info()攔截,你會繞過通過啓動一個線程來處理實際的響應。像這樣的東西(可能包含錯別字,我不能與你的代碼來測試 - 例如不知道什麼flow是)

def handle_response(self, data): 
    logging.info("[received_data] data <{}>".format(flow)) 
    self.rep_socket.send(b"\x00") 
    self.blocking_op(data) 

def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      _t = threading.Thread(target=self.handle_response, args=(cl_data,)) 
      _t.start() 

這有你的received_info()循環,因爲它是,而是做着處理在那裏,啓動一個新線程來處理響應。它需要花費什麼才能完成,然後線程死亡,但是您的received_info()將立即準備好等待新的響應。

+0

非常感謝Hannu,它工作。順便說一句,在args =(cl_data,)cl_data之後爲什麼會有昏迷? 一個額外的問題:你想,如果我要處理的說1000個是客戶更好使用線程或使用GEVENT(或ASYNCIO)? – Corey

+0

逗號在那裏,因爲你只傳遞一個參數,'args'必須是一個元組。如果你傳遞了多個參數,你可以聲明args =(a,b,c)而不用尾隨的逗號,但它是從一個項目中創建一個元組的最簡單的方法。 – Hannu

+0

我不是asyncio的專家,所以無法評論性能。由於GIL,Python並不是最有效的並行處理語言。嘗試線程,如果有問題,請調查。線程可能是絕對好的。 – Hannu