2016-06-08 1247 views
5

我正在進入python mutliprocessing模塊,我遇到了一些問題。我對線程模塊非常熟悉,但我需要確保正在執行的進程並行運行。Python多處理 - AssertionError:只能加入子進程

下面是我想要做的事情的概要。請忽略諸如未聲明的變量/函數之類的內容,因爲我無法完整地粘貼我的代碼。

import multiprocessing 
import time 

def wrap_func_to_run(host, args, output): 
    output.append(do_something(host, args)) 
    return 

def func_to_run(host, args): 
    return do_something(host, args) 

def do_work(server, client, server_args, client_args): 
    server_output = func_to_run(server, server_args) 
    client_output = func_to_run(client, client_args) 
    #handle this output and return a result 
    return result 

def run_server_client(server, client, server_args, client_args, server_output, client_output): 
    server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output)) 
    server_process.start() 
    client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output)) 
    client_process.start() 
    server_process.join() 
    client_process.join() 
    #handle the output and return some result  

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_output = client_output = [] 
    server_cmd = "cmd" 
    client_cmd = "cmd" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output)) 
    process_one.start() 
    #set up second process to run - but this one can run here 
    result = do_work(server, client, "some server args", "some client args") 
    process_one.join() 
    #use outputs above and the result to determine result 
    return final_result 

def main(): 
    #grab client 
    client = client() 
    #grab server 
    server = server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

這裏是我得到的錯誤:

Error in sys.exitfunc: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs 
    func(*targs, **kargs) 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function 
    p.join() 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join 
    assert self._parent_pid == os.getpid(), 'can only join a child process' 
AssertionError: can only join a child process 

我已經嘗試了很多不同的東西來解決這個問題,但我的感覺是,有一些錯誤,我用這個方式模塊。

編輯:

所以我創建了一個將通過模擬客戶端/服務器和他們所做的工作重現這個文件 - 我也錯過了一個重要的點,這是我在UNIX上運行此。另一個重要的信息是do_work在我的實際案例涉及使用os.fork()。如果沒有使用os.fork(),我無法重現錯誤,因此我假設問題出在那裏。在我的真實世界的情況下,這部分代碼不是我的,所以我把它當作一個黑盒子(可能是我的錯誤)。反正這裏的代碼重現 -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self): 
     x = 0 
     for i in range(10000): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self): 
     x = 0 
     for i in range(5000): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args): 
    print host.name + " is working" 
    host.work() 
    print host.name + ": " + args 
    return "done" 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 

def run_server_client(server, client, server_args, client_args): 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    result = do_work(server, client, "server args from do work", "client args from do work") 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return result 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

如果我刪除do_work使用os.fork()我沒有得到錯誤和代碼的行爲就像我本來期望它之前(除產出的傳球,我已被接受爲我的錯誤/誤解)。我可以將舊代碼更改爲不使用os.fork(),但我也想知道爲什麼這會導致此問題,以及是否有可行的解決方案。

編輯2:

我開始省略了接受的答案之前os.fork()的解決方案工作。下面是我有一些調整,可以做的模擬工作量 -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 
from Queue import Empty 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self, w): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args, w, q): 
    print host.name + " is working" 
    host.work(w) 
    print host.name + ": " + args 
    q.put("ZERO") 
    return "done" 

def handle_queue(queue): 
    done = False 
    results = [] 
    return_val = 0 
    while not done: 
     #try to grab item from Queue 
     tr = None 
     try: 
      tr = queue.get_nowait() 
      print "found element in queue" 
      print tr 
     except Empty: 
      done = True 
     if tr is not None: 
      results.append(tr) 
    for el in results: 
     if el != "ZERO": 
      return_val = 1 
    return return_val 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 



def run_server_client(server, client, server_args, client_args, w, mq): 
    local_queue = multiprocessing.Queue() 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 
    if handle_queue(local_queue) == 0: 
     mq.put("ZERO") 

def run_in_parallel(server, client): 
    #set up commands for first process 
    master_queue = multiprocessing.Queue() 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    #result = do_work(server, client, "server args from do work", "client args from do work") 
    run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue) 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return_val = handle_queue(master_queue) 
    print return_val 
    return return_val 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    val = run_in_parallel(server, client) 
    if val: 
     print "failed" 
    else: 
     print "passed" 
    return val 

if __name__ == "__main__": 
    main() 

這段代碼有一些微調的打印只是爲了看看到底發生了什麼。我使用了一個multiprocessing.Queue來存儲和分享整個流程的輸出,並返回到我的主線程中進行處理。我認爲這解決了我的問題的python部分,但是我正在處理的代碼中仍然存在一些問題。我唯一能說的是,相當於func_to_run涉及通過ssh發送一個命令,並隨輸出一起抓取任何err。出於某種原因,這對於執行時間較短的命令來說工作得很好,但對於執行時間/輸出大得多的命令來說效果不佳。我試着在我的代碼中使用完全不同的工作值來模擬這個,但一直沒有能夠重現類似的結果。

編輯我使用(再不是我的)3 庫代碼使用Popen.wait()的SSH命令,我只是閱讀:

Popen.wait() Wait for child process to terminate. Set and return returncode attribute.

Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.

我調整了代碼,沒有緩衝,只是打印,因爲它是收到,一切正常。

+2

這裏有多個問題。首先:'output.append()'在使用'multiprocessing'模塊時可能無法達到你想要的效果。其次,你所報告的問題如下所述:你不能在當前正在執行的'Process'所不具有的'Process'上調用'.join()'。你承認你的例子是合成的,所以很難說出問題的出在哪裏。你確定你將'Process()'的結果賦值給一個像本例一樣的短暫局部變量?或者你是使用全局變量還是實例變量(例如'self.process_one')? –

+0

對於第一個問題 - 你是對的。它在我使用線程時工作,但可能是因爲共享內存空間。進程的變量是本地的。這些進程是否不屬於調用進程?我只是在創建它們的函數中加入進程,所以我會假設所有權是可以的,除非有一些調度問題。 –

+0

啊!我現在看到它,你正在'atexit'處理程序中調用'join',但只是因爲'multiprocessing'試圖自行清理。我會推測這是將一個'Process'實例傳遞給另一個進程的結果。如果是這樣的話,這是IMO在CPython中的一個微妙的錯誤。 「流程」應該拒絕「pickle」,並在您嘗試通過時給予例外。我知道創建最低限度的播放器並不容易,但我認爲你會發現它值得你花時間。它會幫助你和我們確定錯誤的關鍵因素。 –

回答

3

I can change the old code to not use os.fork() but I'd also like to know why this caused this problem and if there's a workable solution.

理解問題的關鍵是要確切知道fork()的作用。 CPython文檔聲明「分叉子進程」。但是這假定您瞭解C庫調用fork()

下面是glibc的的手冊頁說的只是:

fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent, except for the following points: ...

它基本上,如果你把你的計劃,並與小的差異作出了程序的狀態(堆,棧,指令指針等)的副本,並讓它獨立於原始執行。當這個子進程自然退出時,它將使用exit(),並且這將觸發模塊註冊的處理程序。

你能做些什麼來避免它?

  • 省略os.fork():使用multiprocessing代替,就像你正在探索
  • 可能有效:執行fork()import multiprocessing,只有在兒童或所需的父。
  • 在孩子中使用_exit()(CPython文檔狀態,「注意退出的標準方式是sys.exit(n)。_exit()通常應該只能在fork()後的子進程中使用。」)

https://docs.python.org/2/library/os.html#os._exit

+0

我打算接受這個答案。非常感謝您的幫助!我最後一次編輯了這個問題,還有一些筆記,以及我爲您的第一個建議開始的實施。 –

0

在我看來,你是一次穿太多。我不會從run_in_parallel開始編寫代碼,而只是用正確的參數調用run_server_client,因爲它們會在內部進行編程。

+0

但是,直到完成後纔會阻止該阻止?我需要'run_server_client'和'do_work'同時運行,這就是爲什麼我在這裏創建一個單獨的進程。 –