0

我有創建子進程,使用未來接收結果,然後殺死其中一些需要時的要求。集成multiprocessing.Process與concurrent.future._base.Future

爲此,我分類了multiprocessing.Process類,並從start()方法返回Future對象。

問題是我無法在cb()函數中接收到結果,因爲它永遠不會被調用。

請幫助/建議如果這可以用其他方式完成,或者我在當前實現中丟失了什麼?

以下是我目前的做法

from multiprocessing import Process, Queue 
from concurrent.futures import _base 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = _base.Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

    def start(self): 
     f = _base.Future() 
     run_thread = threading.Thread(target=self.run) 
     run_thread.start() 
     return f 


def cb(future): 
    print('received result in callback {}'.format(future)) 


def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    f = p1.start() 
    f.add_done_callback(fn=cb) 

    sleep(10) 


if __name__ == '__main__': 

    main() 

    print('Main thread dying') 

回答

1

在你開始你的方法創建一個新的未來,你再回來。這是你設定結果的不同未來,這個未來根本就沒有用過。嘗試:

def start(self): 
    run_thread = threading.Thread(target=self.run) 
    run_thread.start() 
    return self.f 

但是,您的代碼存在更多問題。您覆蓋該進程的start方法,將其替換爲工作線程上的執行,因此實際上繞過了多處理。另外,您不應該導入_base模塊,即從前導下劃線中看到的實現細節。你應該導入concurrent.futures.Future(這是相同的類,但通過公共API)。

這確實使用多:

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

而你在一個新的進程,現在已經是,產卵一個工作線程來執行你的目標函數真的不應該是必要的,你可以只執行你的目標函數直接代替。如果目標函數引發一個你不知道的異常,你的回調只會在成功時被調用。所以如果你解決了這個問題,那麼你只需要:

from multiprocessing import Process 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    return result 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     try: 
      r = self.target(*self.args) 
      print('setting result {}'.format(r)) 
      self.f.set_result(result=r) 
      print('done setting result') 
     except Exception as ex: 
      self.f.set_exception(ex) 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

這基本上就是ProcessPoolExecutor所做的。

+0

感謝您的回覆。這是一個非常愚蠢的錯誤。感謝您的注意! :) –