0

我使用如下所示的 concurrent.futures ProcessPoolExecutor方法並行化了大型CPU密集型數據處理任務。Python:如何掛起進程以釋放control.futures池中的控制權?

with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: 
     futures_ocr = ([ 
      executor.submit(
       MyProcessor, 
       folder 
      ) for folder in sub_folders 
     ]) 

     is_cancel = wait_for(futures_ocr) 
     if is_cancel: 
      print 'shutting down executor' 
      executor.shutdown() 

def wait_for(futures): 
    """Handes the future tasks after completion""" 

    cancelled = False 

    try: 
     for future in concurrent.futures.as_completed(futures, timeout=200): 
      try: 
       result = future.result() 
       print 'successfully finished processing folder: ', result.source_folder_path 

      except concurrent.futures.TimeoutError: 
       print 'TimeoutError occured' 


      except TypeError: 
       print 'TypeError occured' 



    except KeyboardInterrupt: 
     print '****** cancelling... *******' 
     cancelled = True 
     for future in futures: 
      future.cancel() 

    return cancelled 

有一些文件夾,其中的過程似乎被卡住不能在代碼中的一些錯誤的原因,但由於文件的性質正在處理的很長一段時間。所以,我想超時這些類型的進程,以便在超過某個時間限制時返回。池然後可以使用該過程來進行下一個可用任務。

as_completed()函數中添加超時會在完成時給出錯誤。

Traceback (most recent call last): 
    File "call_ocr.py", line 96, in <module> 
    main() 
    File "call_ocr.py", line 42, in main 
    is_cancel = wait_for(futures_ocr) 
    File "call_ocr.py", line 59, in wait_for 
    for future in concurrent.futures.as_completed(futures, timeout=200): 
    File "/Users/saurav/.pyenv/versions/ocr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 216, in as_completed 
    len(pending), len(fs))) 
concurrent.futures._base.TimeoutError: 3 (of 3) futures unfinished 

我在做什麼錯在這裏,什麼是造成已逾時過程停止並放棄處理回過程池的最佳方式?

回答

0

concurrent.futures實現不支持這種用例。

可以傳遞給它的函數和方法的timeout允許設置等待結果多久,但對實際計算本身沒有影響。

pebble庫支持這種用例。

from concurrent.futures import TimeoutError 
from pebble import ProcessPool 

def function(n): 
    return n 

with ProcessPool() as pool: 
    future = pool.schedule(function, args=[1], timeout=10) 

    try: 
     results = future.result() 
    except TimeoutError as error: 
     print("function took longer than %d seconds" % error.args[1])