2017-09-25 114 views
2

我有一些Python代碼,它使用multiprocessing.pool.Pool.imap_unordered在CPU綁定中並行創建一堆臨時文件。然後,我從結果迭代器中讀取文件名,在第二個磁盤綁定步驟中處理每個文件,然後刪除它們。通常磁盤綁定步驟是兩者中較快的,因此每個臨時文件在創建下一個臨時文件之前都會被處理和刪除。但是,在網絡文件系統上運行時,磁盤綁定步驟可能會變爲慢速步驟,在這種情況下,並行運行的CPU綁定步驟會開始生成臨時文件,而磁盤綁定步驟可能會處理並刪除它們,所以大量的臨時文件開始積累。爲了避免這個問題,如果並行迭代比消費者提前超過10個項目,我想要暫停並行迭代。有沒有其他可以替代multiprocessing.pool.Pool.imap_unordered的產品?我可以告訴Python的多處理池不要太遙遙領先嗎?

下面是一些示例代碼來模擬問題:

import os 
from time import sleep 
from multiprocessing.pool import Pool 

input_values = list(range(10)) 

def fast_step(x): 
    print("Running fast step for {x}".format(x=x)) 
    return x 

def slow_step(x): 
    print("Starting slow step for {x}".format(x=x)) 
    sleep(1) 
    print("Finishing slow step for {x}".format(x=x)) 
    return x 

mypool = Pool(2) 

step1_results = mypool.imap(fast_step, input_values) 

for i in step1_results: 
    slow_step(i) 

運行此產生類似:

$ python temp.py 
Running fast step for 0 
Running fast step for 1 
Running fast step for 2 
Running fast step for 3 
Running fast step for 4 
Starting slow step for 0 
Running fast step for 5 
Running fast step for 6 
Running fast step for 7 
Running fast step for 8 
Running fast step for 9 
Finishing slow step for 0 
Starting slow step for 1 
Finishing slow step for 1 
Starting slow step for 2 
Finishing slow step for 2 
Starting slow step for 3 
Finishing slow step for 3 
Starting slow step for 4 
Finishing slow step for 4 
Starting slow step for 5 
Finishing slow step for 5 
Starting slow step for 6 
Finishing slow step for 6 
Starting slow step for 7 
Finishing slow step for 7 
Starting slow step for 8 
Finishing slow step for 8 
Starting slow step for 9 
Finishing slow step for 9 

回答

0

我想你可以創建一個Queue來存儲您的臨時文件。由於Queue可以設置最大長度,如果在使用queue.put時它已滿,它將阻塞,直到仍有一定的空間。在這種情況下,您可以輕鬆使您的過程暫停。

0

我把建議使用multiprocessing.Queue,和大量的實驗後,我想出了使用多個CPU,像Pool.imap_unordered運行的東西並行的IMAP般的功能,但避免使用有限的過於超前大小的隊列。

import os 
from time import sleep 
from multiprocessing import Queue, Process 
import atexit 
from contextlib import contextmanager 

def feed_queue(q, items, sentinel=None, num_sentinels=0): 
    for x in items: 
     if x == sentinel: 
      break 
     # print("Feeding {:.1f} into queue".format(x)) 
     q.put(x) 
    for i in range(num_sentinels): 
     q.put(sentinel) 

class Sentinel: 
    def __eq__(self, other): 
     return isinstance(other, Sentinel) 

class ParallelMapWorkerProcess(Process): 
    def __init__(self, target, q_in, q_out, sentinel=None, *args, **kwargs): 
     self.q_in = q_in 
     self.q_out = q_out 
     self.target = target 
     self.sentinel_value = sentinel 
     super().__init__(*args, **kwargs) 

    def run(self): 
     try: 
      while True: 
       x = self.q_in.get() 
       if x == self.sentinel_value: 
        break 
       result = self.target(x) 
       self.q_out.put(result) 
      while True: 
       self.q_out.put(self.sentinel_value) 
     except KeyboardInterrupt: 
      pass 

@contextmanager 
def parallel_imap_buffered(func, iterable, num_cpus=1, buffer_size=1): 
    input_queue = Queue(1) 
    input_queue.cancel_join_thread() 
    output_queue = Queue(buffer_size) 
    output_queue.cancel_join_thread() 
    sentinel = Sentinel() 
    feeder_proc = Process(target=feed_queue, args=(input_queue, iterable, sentinel, num_cpus)) 
    worker_procs = [ ParallelMapWorkerProcess(func, input_queue, output_queue, sentinel) 
        for i in range(num_cpus) ] 
    try: 
     feeder_proc.start() 
     for p in worker_procs: 
      p.start() 
     yield iter(output_queue.get, sentinel) 
    finally: 
     feeder_proc.terminate() 
     for p in worker_procs: 
      p.terminate() 

這會抽象出管理隊列和進程的細節。它可以像這樣使用:

def fast_step(x): 
    print("Running fast step for {:.1f}".format(x)) 
    return x + 0.1 

def slow_step(x): 
    print("Starting slow step for {:.1f}".format(x)) 
    # Make it slow 
    sleep(0.1) 
    print("Finishing slow step for {:.1f}".format(x)) 
    return x + 0.1 

input_values = range(50) 

with parallel_imap_buffered(fast_step, input_values, num_cpus=4, buffer_size=2) as fast_step_results, \ 
    parallel_imap_buffered(slow_step, fast_step_results, num_cpus=1, buffer_size=2) as slow_step_results: 
    for x in slow_step_results: 
     if x >= 10: 
      raise Exception("Mid-pipeline exception") 
     print('Got value: {:.1f}'.format(x)) 
    print("Finished") 

使用上下文管理器允許迭代器殺死子進程一旦迭代器不再需要,不管是否迭代器已經耗盡或沒有。正如演示的那樣,當主進程拋出一個異常時,這似乎工作。如果任何人都可以舉例說明代碼失敗,死鎖或其他不好的情況,請發表評論。 (編輯:經過一些測試,這個代碼是不理想的,它可能會遇到從特定位置引發異常時的問題。)

我有點驚訝,像這樣的東西沒有內置到Python標準庫。