可以使用Queue
通過一個循環的開始Process
反饋故障到Pool
:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure/exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
total_items = len(pending)
successful = []
failure_tracker = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, pending)
retry_results = []
while len(successful) < total_items:
successful.extend([r for r in results if not r is None])
successful.extend([r for r in retry_results if not r is None])
failed_items = []
while not q.empty():
failed_items.append(q.get())
if failed_items:
failure_tracker.append(failed_items)
retry_results = p.imap(f, failed_items);
p.close()
p.join()
print "Results: %s" % successful
print "Failures: %s" % failure_tracker
if __name__ == '__main__':
main(range(1, 10))
輸出是這樣的:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
一個Pool
着在多個進程之間共享。因此,這種基於Queue
的方法。如果你試圖通過一個池作爲參數傳遞給池的過程中,您將收到此錯誤:
NotImplementedError: pool objects cannot be passed between processes or pickled
你可以嘗試或者你的函數f
在數立即重試,以避免同步開銷。這實際上是一個問題,你的函數應該等待多久才能重試,以及如果立即重試成功的可能性有多大。
老答案:爲了完整起見,這裏是我的老的答案,這是不直接重新提交入池是最佳的,但可能仍然是相關取決於使用情況,因爲它提供了一個自然的方式來處理/限n
-level重:
可以使用Queue
來聚集失敗和在每次運行結束時重新提交,在多次運行:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure/exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
run_number = 1
while pending:
jobs = pending
pending = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
p.join()
failed_items = []
while not q.empty():
failed_items.append(q.get())
successful = [r for r in results if not r is None]
print "(%d) Succeeded: %s" % (run_number, successful)
print "(%d) Failed: %s" % (run_number, failed_items)
print
pending = failed_items
run_number += 1
if __name__ == '__main__':
main(range(1, 10))
,像這樣的輸出:
(1) Succeeded: [9, 16, 36, 81]
(1) Failed: [2, 1, 5, 7, 8]
(2) Succeeded: [64]
(2) Failed: [2, 1, 5, 7]
(3) Succeeded: [1, 25]
(3) Failed: [2, 7]
(4) Succeeded: [49]
(4) Failed: [2]
(5) Succeeded: [4]
(5) Failed: []
也許你想'回報˚F (x)'而不是引發'ValueError'?只是猜測... – 2012-07-24 02:03:16
實際應用中失敗的機率有多高?也就是說,與等待其他進程首先完成相比,進程重試的過程有多重要? – Isaac 2012-07-24 02:05:12
這是一個失敗的中等機會,它不需要立即重試(但最終應平行重試)。 – ash 2012-07-24 05:29:48