2017-07-14 33 views
0

我創建了一個程序,它可以被總結到這樣的多進程池:終止時,一名工人發現,妥善解決

from itertools import combinations 
class Test(object): 
    def __init__(self, t2): 
     self.another_class_object = t2 

def function_1(self,n): 
    a = 2 
    while(a <= n): 
     all_combs = combinations(range(n),a) 
     for comb in all_combs: 
      if(another_class_object.function_2(comb)): 
       return 1 
     a += 1 
    return -1 

功能combinationsitertools進口。 Function_2返回TrueFalse取決於輸入和在另一個類對象的方法,例如:

class Test_2(object): 

def __init__(self, list): 
    self.comb_list = list 

def function_2(self,c): 
    return c in self.comb_list 

一切工作得很好。但現在我想稍微改變它並實現多處理。我發現this topic,顯示瞭如何退出腳本的例子,當工作進程的一個決定沒有更多的工作需要做。所以我做了以下修改:

  1. 加入池的定義爲__init__方法:self.pool = Pool(processes=8)
  2. 創建一個回調函數:

    all_results = [] 
    def callback_function(self, result): 
        self.all_results.append(result) 
        if(result): 
         self.pool.terminate() 
    
  3. 改變function_1

    def function_1(self,n): 
        a = 2 
        while(a <= n): 
         all_combs = combinations(range(n),a) 
         for comb in all_combs: 
          self.pool.apply_async(self.another_class_object.function_2, args=comb, callback=self.callback_function) 
         #self.pool.close() 
         #self.pool.join() 
         if(True in all_results): 
          return 1 
         a += 1 
        return -1 
    

不幸的是,它不能像我預期的那樣工作。爲什麼?調試後,它看起來像回調函數永遠不會到達。我認爲每個工人都會接觸到它。我錯了嗎?可能是什麼問題?

+0

對於初學者,不要將實例方法傳遞給'multiprocessing.Pool'工具(或多處理代碼),因爲這是一個不想打開的包。 – zwer

回答

1

我沒有嘗試你的代碼,這樣,但是我想你的結構。你確定問題出在回調函數中,而不是工作者函數?如果函數是一個類方法,我沒有設法讓apply_async啓動worker函數的一個實例。它只是沒有做任何事情。 Apply_async完成而沒有錯誤,但它不實現該工人。

只要我移動的工作器功能(在你的情況another_class_object.function2)作爲一個獨立的全局函數外的類,它開始按預期工作,回調是正常觸發。相反,回調函數似乎可以很好地用作類方法。

似乎有關於這個例如在這裏討論:Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

這是任何方式有用嗎?

哈努哈利

1

Question: ... not work as I expected. ... What can be the problem?

它總是需要從get()pool.apply_async(...結果看到錯誤從池進程。

更改爲以下:

pp = [] 
for comb in all_combs: 
    pp.append(pool.apply_async(func=self.another_class_object.function_2, args=comb, callback=self.callback_function)) 

pool.close() 

for ar in pp: 
    print('ar=%s' % ar.get()) 

And you will see this Error:

TypeError: function_2() takes 2 positional arguments but 3 were given 

修正了這個錯誤,改變args=combargs=(comb,)

pp.append(pool.apply_async(func=self.another_class_object.function_2, args=(comb,), callback=self.callback_function)) 

與Python測試:3.4。2