2012-03-09 49 views
0

我從來沒有與multiprocessing一起工作之前,所以忍受着我,如果我問一個基本的問題。多進程與進步

This answer提供了一個非常好的處理類,我適應了我的需求,它工作得很好。我試圖實現一個基本的進度條,我使用print語句進行測試,但它根本不工作(沒有任何輸出)。

我當前的代碼是這樣的:

class ParsingMaster(object): 
    def __init__(self, parser, input_file, output_file): 
    self.parser = parser 

    self.num_processes = cpu_count() 
    self.input_file = input_file 
    self.output_file = output_file 

    self.input_queue = Queue() 
    self.output_queue = Queue() 

    self.input_size = 0 

    self.input_process = Process(target=self.parse_input) 
    self.output_process = Process(target=self.write_output) 
    self.processes = [Process(target=self.process_row) for row in range(self.num_processes)] 

    self.input_process.start() 
    self.output_process.start() 

    for process in self.processes: 
     process.start() 

    self.input_process.join() 

    for process in self.processes: 
     process.join() 

    self.output_process.join() 

    def parse_input(self): 
    for index, row in enumerate(self.input_file): 
     self.input_queue.put([index, row]) 
     self.input_size = self.input_queue.qsize() 

    for i in range(self.num_processes): 
     self.input_queue.put('STOP') 

    def process_row(self): 
    for index, row in iter(self.input_queue.get, 'STOP'): 
     self.output_queue.put([index, row[0], self.parser.parse(row[1])]) 

    self.output_queue.put('STOP') 

    def write_output(self): 
    current = 0 
    buffer = {} 

    for works in range(self.num_processes): 
     for index, id, row in iter(self.output_queue.get, 'STOP'): 
     if index != current: 
      buffer[index] = [id] + row 
     else: 
      self.output_file.writerow([id] + row) 
      current += 1 

      while current in buffer: 
      self.output_file.writerow(buffer[current]) 
      del buffer[current] 
      current += 1 

      if self.input_size: 
       print float(current * 100)/float(self.input_size) 

一些測試後,我發現了一些奇怪的事情:

  • self.input_sizeparse_input()正確更新。
  • parse_input()結束,而write_output()仍在運行。
  • write_output()總是報告self.input_size = 0

任何人都可以告訴我我要去哪裏嗎?任何幫助是有幫助的,所以提前謝謝你。

回答

2

self.input_size是一個進程局部變量,每個進程都有自己的副本。根據multiprocessing documentation,您需要將數據包裝到容器中,如ValueArray以使其共享。

+0

謝謝。一小時前我設法弄清了自己的情況,但知道我正朝着正確的方向前進;) – Blender 2012-03-09 08:50:36