2011-10-29 39 views
12

我想寫一個類,它將使用多個進程計算校驗和,從而利用多個內核。我有一個非常簡單的類,它在執行一個簡單的案例時效果很好。但是每當我創建兩個或更多類的實例時,工作者就不會退出。看起來它從來沒有收到管道被父母關閉的消息。使用python多處理管道

所有的代碼可以在下面找到。我首先分別計算md5和sha1校驗和,然後我嘗試並行執行計算,然後在關閉管道時鎖定程序。

這是怎麼回事?爲什麼不按照我的預期工作?我想我可以通過在隊列上發送一個「停止」消息來做一個解決方法,並讓孩子退​​出這種方式,但我真的很想知道爲什麼這不工作。

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 
    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     self.parent_conn.close() # This is the child. Close unused end. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 


def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums in parallel causes a lockup! 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here! 

main() 

PS。此問題已得到解決這裏是上述代碼的工作版本,如果有人有興趣:

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 

    all_open_parent_conns = [] 

    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     ChecksumPipe.all_open_parent_conns.append(self.parent_conn) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     for conn in ChecksumPipe.all_open_parent_conns: 
      conn.close() # This is the child. Close unused ends. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 

def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums also works fine now 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() 

main() 
+0

您可能希望在'self.parent_conn.close()'之後添加'ChecksumPipe.all_open_parent_conns.remove(self.parent_conn)'來讓連接被銷燬。 –

+0

'self.summer = eval(「hashlib。%s()」%csname)'看起來很醜。怎麼樣'self.summer = getattr(hashlib,csname)()'? – glglgl

回答

7

是的,這是令人驚訝的行爲確實如此。

但是,如果您查看兩個並行子進程的輸出lsof,很容易注意到第二個子進程打開了更多的文件描述符。

會發生什麼情況是,當兩個並行子進程開始時,第二個子進程繼承父進程的管道,因此當父進程調用self.parent_conn.close()時,第二個子進程仍然打開管道文件描述符,以便管道文件描述不會在內核中關閉(引用計數大於0),其效果是在第一個並行子進程中的self.child_conn.recv_bytes()從不從read() s EOFEOFError永遠不會被拋出。

您可能需要發送明確的關閉消息,而不是關閉文件描述符,因爲似乎很少控制哪些文件描述符在哪些進程之間共享(沒有關閉文件描述符標誌)。

+0

謝謝!這清理了我的東西。我在我的例子中通過使用包含所有實例中的所有打開連接的共享類變量來解決這個問題,以便孩子們可以關閉所有不需要的套接字。 –