0
我使用如下所示的 concurrent.futures ProcessPoolExecutor
方法並行化了大型CPU密集型數據處理任務。Python:如何掛起進程以釋放control.futures池中的控制權?
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures_ocr = ([
executor.submit(
MyProcessor,
folder
) for folder in sub_folders
])
is_cancel = wait_for(futures_ocr)
if is_cancel:
print 'shutting down executor'
executor.shutdown()
def wait_for(futures):
"""Handes the future tasks after completion"""
cancelled = False
try:
for future in concurrent.futures.as_completed(futures, timeout=200):
try:
result = future.result()
print 'successfully finished processing folder: ', result.source_folder_path
except concurrent.futures.TimeoutError:
print 'TimeoutError occured'
except TypeError:
print 'TypeError occured'
except KeyboardInterrupt:
print '****** cancelling... *******'
cancelled = True
for future in futures:
future.cancel()
return cancelled
有一些文件夾,其中的過程似乎被卡住不能在代碼中的一些錯誤的原因,但由於文件的性質正在處理的很長一段時間。所以,我想超時這些類型的進程,以便在超過某個時間限制時返回。池然後可以使用該過程來進行下一個可用任務。
在as_completed()
函數中添加超時會在完成時給出錯誤。
Traceback (most recent call last):
File "call_ocr.py", line 96, in <module>
main()
File "call_ocr.py", line 42, in main
is_cancel = wait_for(futures_ocr)
File "call_ocr.py", line 59, in wait_for
for future in concurrent.futures.as_completed(futures, timeout=200):
File "/Users/saurav/.pyenv/versions/ocr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 216, in as_completed
len(pending), len(fs)))
concurrent.futures._base.TimeoutError: 3 (of 3) futures unfinished
我在做什麼錯在這裏,什麼是造成已逾時過程停止並放棄處理回過程池的最佳方式?