2015-02-23 68 views
3

我有一份我希望分發給池中工作人員的任務列表。我想達到兩個目的:多處理:池:等待所有結果,但立即處理單個結果

  1. 當工人完成,過程,結果立即
  2. 有一個簡單的方法來等待所有的工人來完成。

使用fapply_async,我可以輕鬆實現第一個目標。只要工作人員完成,回調就會被調用。但是,爲了實現第二個目標,我能想出的唯一解決方案基本上只是輪詢AsyncResults,直到它們都準備好()。

使用map_async,我可以輕鬆實現第二個目標。但是,只有當所有工作人員完成後,回調纔會被調用一次。我相信我理解這個的原因(結果的順序是相關的)。

是否有一些解決方案我錯過了將實現目標1和2?

這裏是我的測試代碼:

#!/usr/bin/python3 

import multiprocessing 
import time 
import random 

def worker(src): 
    time.sleep(0.2) 
    # src is apply_async or map_async 
    return (src, random.randint(1, 100)) 

def map_async_example(): 
    tasks = ['map_async'] * 20 
    with multiprocessing.Pool(processes=4) as pool: 
     r = pool.map_async(worker, tasks, callback=print) 
     r.wait() 

def fapply_async_example(): 
    tasks = [('fapply_async',)] * 20 
    with multiprocessing.Pool(processes=4) as pool: 
     ars = [] 
     for t in tasks: 
      ar = pool.apply_async(worker, t, callback=print) 
      ars.append(ar) 
     # Wait for all AsyncResults to become ready() 
     while len(ars) > 0: 
      time.sleep(0.5) 
      # Keep only the not-ready results 
      ars = [ar for ar in ars if not ar.ready()] 

def main(): 
    # One list of 20 results 
    print('===============') 
    print('Using map_async') 
    print('===============') 
    map_async_example() 

    # 20 results 
    print('==================') 
    print('Using fapply_async') 
    print('==================') 
    fapply_async_example() 

if __name__ == '__main__': 
    main() 

回答

3

也許我失去了一些東西,但爲什麼不只是做你的處理,然後join()末他們呢?

+0

是的,它做到了。謝謝!我錯誤地解釋了之前必要的close()函數的效果,所以我沒有嘗試。 – Duoran 2015-02-23 13:30:43