2011-08-17 59 views
2

這些代碼是我的代理服務器程序的一部分,它的功能是創建socket和fork四個進程來逐個接受。共享偵聽套接字的多個進程:當新進程進入時,爲什麼舊​​進程停止?

在我的程序中,我使用gevent模型來調度我的所有函數,並在將其更改爲多個進程之前,我的程序是正確的。但現在當我使用第二個進程時,第一個進程停止運行,我沒有發現哪裏出錯,可能是「接受」函數或我的事件是停止調度。

它已經困擾了我兩天,我希望有人能幫助我。

順便說一句,我的英文很差,我盡我所能解釋它,希望你能理解。

class Client(object): 
    def __init__(self, ent, ev): 
     ... 

    def receive(self): 
     ... 
     if "Content-Length" in dic: 
      self.ent_s_send = core.event(core.EV_WRITE, 
             self.conn.fileno(), 
             self.ser_send, 
             [self.conn,self.body] 
             ) 
      self.recv_ent = core.event(core.EV_READ, 
             self.sock.fileno(), 
             self.recv_content 
            ) 
      self.recv_ent.add() 
     ... 

    def recv_content(self, ent, ev): 
     ... 
     self.n = self.sock.recv_into(self.msg, 
            min(self.total-self.num, 20000), 
            socket.MSG_DONTWAIT) 

     **time.sleep(0.1)** 
     #if i add it here to let the event slow down the problem solved, how it could be? 

     self.num += self.n 
     self.msg_buffer.fromstring(self.msg.tostring()[:self.n]) 
     ... 
     if self.total > self.num: #if not the last msg continue recving and sending... 
      self.ent_s_send.add() 
      self.recv_ent.add() 
     ... 

    def ser_send(self, ent, ev): 
     ... 
     num = self.conn.send(self.msg_buffer,socket.MSG_DONTWAIT) 
     ... 
     self.msg_buffer = self.msg_buffer[num:] 

... 
... 

class Server(object): 
    def __init__(self): 
     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.sock.bind(('localhost', 18001)) 
     self.sock.listen(50) 
     self.mutex = multiprocessing.Lock() 

    def loop(self,): 

     for i in range(0,4): 
      pid = os.fork() 
      if pid == 0 or pid == -1: 
       break 

     if pid == -1: 
      print "Fork failed!!!" 
      sys.exit() 

     elif pid == 0: **# create four child ps to accept the socket!** 
      print "Child PID = %d" % os.getpid() 
      core.init() 
      self.event = core.event(core.EV_READ, 
           self.sock.fileno(), 
           self.onlink) 
      self.event.add() 
      core.dispatch() 

     else: 
      os.wait() 

    def onlink(self, ent, ev): 
     self.mutex.acquire() 
     print 'Accept PID = %s' % os.getpid() 
     try: 
      self.conn, self.addr = self.sock.accept() 
      **#I think 'accept' is the the problem, but I cannot see how.** 

     except socket.error, why: 
      if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: 
       return 
      else: 
       raise 
     print self.sock,self.conn,self.addr 
     self.mutex.release() 
     print 'Release PID = %s' % os.getpid() 
     cc = Chat(self.conn, self.sock) 
     self.event.add() 



if __name__ == '__main__': 

    s1 = Server() 
    s1.loop() 
+2

請不要只是粘貼你的代碼。通過編輯您的帖子來添加它,並嘗試向您的代碼添加註釋,以便我們知道在哪裏查找問題,從而更詳細地描述您的問題。 – agf 2011-08-17 10:22:45

+0

你確實不需要互斥鎖。他們在過去使用它來避免'accept()`的雷鳴羣體問題,但它早已修復。 – 2011-08-17 16:07:33

回答

1

accept()blocking call。它將無限期地等待客戶端連接。因爲您完全鎖定了所有其他併發進程,因此在這種阻止操作中持有互斥鎖是Bad IdeaTM

此外,正如@Maxim在評論中指出的那樣,您並不需要鎖定accept()。讓操作系統仲裁出入連接並將它們分派給你的進程。