我正在進入python mutliprocessing模塊,我遇到了一些問題。我對線程模塊非常熟悉,但我需要確保正在執行的進程並行運行。Python多處理 - AssertionError:只能加入子進程
下面是我想要做的事情的概要。請忽略諸如未聲明的變量/函數之類的內容,因爲我無法完整地粘貼我的代碼。
import multiprocessing
import time
def wrap_func_to_run(host, args, output):
output.append(do_something(host, args))
return
def func_to_run(host, args):
return do_something(host, args)
def do_work(server, client, server_args, client_args):
server_output = func_to_run(server, server_args)
client_output = func_to_run(client, client_args)
#handle this output and return a result
return result
def run_server_client(server, client, server_args, client_args, server_output, client_output):
server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
server_process.start()
client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
client_process.start()
server_process.join()
client_process.join()
#handle the output and return some result
def run_in_parallel(server, client):
#set up commands for first process
server_output = client_output = []
server_cmd = "cmd"
client_cmd = "cmd"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
process_one.start()
#set up second process to run - but this one can run here
result = do_work(server, client, "some server args", "some client args")
process_one.join()
#use outputs above and the result to determine result
return final_result
def main():
#grab client
client = client()
#grab server
server = server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
這裏是我得到的錯誤:
Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
我已經嘗試了很多不同的東西來解決這個問題,但我的感覺是,有一些錯誤,我用這個方式模塊。
編輯:
所以我創建了一個將通過模擬客戶端/服務器和他們所做的工作重現這個文件 - 我也錯過了一個重要的點,這是我在UNIX上運行此。另一個重要的信息是do_work
在我的實際案例涉及使用os.fork()
。如果沒有使用os.fork()
,我無法重現錯誤,因此我假設問題出在那裏。在我的真實世界的情況下,這部分代碼不是我的,所以我把它當作一個黑盒子(可能是我的錯誤)。反正這裏的代碼重現 -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
class Host():
def __init__(self):
self.name = "host"
def work(self):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self):
x = 0
for i in range(10000):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self):
x = 0
for i in range(5000):
x+=1
print x
time.sleep(1)
def func_to_run(host, args):
print host.name + " is working"
host.work()
print host.name + ": " + args
return "done"
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args):
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
def run_in_parallel(server, client):
#set up commands for first process
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
result = do_work(server, client, "server args from do work", "client args from do work")
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return result
def main():
#grab client
client = Client()
#grab server
server = Server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
如果我刪除do_work
使用os.fork()
我沒有得到錯誤和代碼的行爲就像我本來期望它之前(除產出的傳球,我已被接受爲我的錯誤/誤解)。我可以將舊代碼更改爲不使用os.fork(),但我也想知道爲什麼這會導致此問題,以及是否有可行的解決方案。
編輯2:
我開始省略了接受的答案之前os.fork()的解決方案工作。下面是我有一些調整,可以做的模擬工作量 -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty
class Host():
def __init__(self):
self.name = "host"
def work(self, w):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
def func_to_run(host, args, w, q):
print host.name + " is working"
host.work(w)
print host.name + ": " + args
q.put("ZERO")
return "done"
def handle_queue(queue):
done = False
results = []
return_val = 0
while not done:
#try to grab item from Queue
tr = None
try:
tr = queue.get_nowait()
print "found element in queue"
print tr
except Empty:
done = True
if tr is not None:
results.append(tr)
for el in results:
if el != "ZERO":
return_val = 1
return return_val
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args, w, mq):
local_queue = multiprocessing.Queue()
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
if handle_queue(local_queue) == 0:
mq.put("ZERO")
def run_in_parallel(server, client):
#set up commands for first process
master_queue = multiprocessing.Queue()
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
#result = do_work(server, client, "server args from do work", "client args from do work")
run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return_val = handle_queue(master_queue)
print return_val
return return_val
def main():
#grab client
client = Client()
#grab server
server = Server()
val = run_in_parallel(server, client)
if val:
print "failed"
else:
print "passed"
return val
if __name__ == "__main__":
main()
這段代碼有一些微調的打印只是爲了看看到底發生了什麼。我使用了一個multiprocessing.Queue來存儲和分享整個流程的輸出,並返回到我的主線程中進行處理。我認爲這解決了我的問題的python部分,但是我正在處理的代碼中仍然存在一些問題。我唯一能說的是,相當於func_to_run
涉及通過ssh發送一個命令,並隨輸出一起抓取任何err。出於某種原因,這對於執行時間較短的命令來說工作得很好,但對於執行時間/輸出大得多的命令來說效果不佳。我試着在我的代碼中使用完全不同的工作值來模擬這個,但一直沒有能夠重現類似的結果。
編輯我使用(再不是我的)3 庫代碼使用Popen.wait()
的SSH命令,我只是閱讀:
Popen.wait()
Wait for child process to terminate. Set and return returncode attribute.Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.
我調整了代碼,沒有緩衝,只是打印,因爲它是收到,一切正常。
這裏有多個問題。首先:'output.append()'在使用'multiprocessing'模塊時可能無法達到你想要的效果。其次,你所報告的問題如下所述:你不能在當前正在執行的'Process'所不具有的'Process'上調用'.join()'。你承認你的例子是合成的,所以很難說出問題的出在哪裏。你確定你將'Process()'的結果賦值給一個像本例一樣的短暫局部變量?或者你是使用全局變量還是實例變量(例如'self.process_one')? –
對於第一個問題 - 你是對的。它在我使用線程時工作,但可能是因爲共享內存空間。進程的變量是本地的。這些進程是否不屬於調用進程?我只是在創建它們的函數中加入進程,所以我會假設所有權是可以的,除非有一些調度問題。 –
啊!我現在看到它,你正在'atexit'處理程序中調用'join',但只是因爲'multiprocessing'試圖自行清理。我會推測這是將一個'Process'實例傳遞給另一個進程的結果。如果是這樣的話,這是IMO在CPython中的一個微妙的錯誤。 「流程」應該拒絕「pickle」,並在您嘗試通過時給予例外。我知道創建最低限度的播放器並不容易,但我認爲你會發現它值得你花時間。它會幫助你和我們確定錯誤的關鍵因素。 –