2016-07-28 157 views
1

我正在使用multiprocessing.queues.JoinableQueue,如下所示:multiprocessing.Queue似乎消失了?操作系統(管道銷燬)與Python?

一個非常長時間運行的線程(多天)輪詢XML的API。這樣做的線程只是將XML解析爲對象,並將它們推送到隊列中。

對每個對象的處理比解析XML要花費更多的時間,並且絕不取決於從API讀取線程。因此,這種多處理的實現很簡單。

用於創建和清理過程的代碼是在這裏:

def queueAdd(self, item): 
     try: 
      self.queue.put(item) 
     except AssertionError: 
      #queue has been closed, remake it (let the other GC) 
      logger.warn('Queue closed early.') 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
      self.queue.put(item) 
     except BrokenPipeError: 
      #workaround for pipe issue 
      logger.warn('Broken pipe, Forcing creation of new queue.') 
      # all reading procesess should suicide and new ones spawned. 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
#    address = 'localhost' 
#    if address in multiprocessing.managers.BaseProxy._address_to_local: 
#     del BaseProxy._address_to_local[address][0].connection 
      self.queue.put(item) 
     except Exception as e: 
      #general thread exception. 
      logger.error('Buffer queue exception %s' % e) 
      #TODO: continue trying/trap exceptions? 
      raise 
     # check for finished consumers and clean them up before we check to see 
     # if we need to add additional consumers. 
     for csmr in self.running: 
      if not csmr.is_alive(): 
       debug('Child dead, releasing.') 
       self.running.remove(csmr) 

     #see if we should start a consumer... 
     # TODO: add min/max processes (default and override) 
     if not self.running: 
      debug('Spawning consumer.') 
      new_consumer = self.consumer(
        queue=self.queue, 
        results_queue=self.results_queue, 
        response_error=self.response_error) 
      new_consumer.start() 
      self.running.append(new_consumer) 

消費過程控制迴路是相當簡單的,以及:

def run(self): 
     '''Consumes the queue in the framework, passing off each item to the 
     ItemHandler method. 
     ''' 
     while True: 
      try: 
       item = self.queue.get(timeout=3) 
       #the base class just logs this stuff 
       rval = self.singleItemHandler(item) 
       self.queue.task_done() 
       if rval and self.results_queue: 
        self.results_queue.put(rval) 
      except queue.Empty: 
       logging.debug('Queue timed out after 3 seconds.') 
       break 
      except EOFError: 
       logging.info(
        '%s has finished consuming queue.' % (__class__.__name__)) 
       break 
      except Exception as e: 
       #general thread exception. 
       self.logger.error('Consumer exception %s' % e) 
       #TODO: continue trying/trap exceptions? 
       raise 

一段時間後(約成功處理一小時),我收到一條日誌消息,指出消費者進程已由於超時DEBUG:root:Queue timed out after 3 seconds.而死亡,但隊列仍處於打開狀態,顯然仍由原始線程寫入。線程似乎並不認爲消費者進程已經終止(請參閱queueAdd方法),並且不會嘗試啓動新進程。隊列看起來不是空的,只是讀取它似乎已超時。

我不知道爲什麼經理認爲孩子還活着。

編輯


我已經修改了原來的問題由於代碼更改爲如何BrokenPipeError記錄以及刪除斷開的連接清理。我認爲這是一個單獨的問題。

+0

一邊仔細審查這個問題上,我意識到,我沒有在這裏列出的BrokenPipe異常記錄輸出:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach

+0

我已經添加了額外的日誌記錄,如上面註釋中所述,但是在另一次測試沒有得到關於BrokenPipeError的日誌消息之後。 – SkyLeach

回答

0

這個問題是由多處理微妙的現實引起的。任何調用queue.put的進程都將運行一個寫入命名管道的後臺線程。

在我的特殊情況下,儘管沒有大量數據發佈到結果隊列(由於某些原因無法處理的項目),但仍然足以「填滿」管道並導致消費者即使沒有運行也無法退出。這反過來導致寫作隊列慢慢填滿。

解決方案是,我修改了我的非阻塞調用,以便下一次api調用迭代,以讀取迄今爲止可用的所有結果,除了最後一次(阻塞)調用之外,它確保獲取所有結果。

def finish(self, block=True, **kwargs): 
    ''' 
    Notifies the buffer that we are done filling it. 
    This command binds to any processes still running and lets them 
    finish and then copies and flushes the managed results list. 
    ''' 
    # close the queue and wait until it is consumed 
    if block: 
     self.queue.close() 
     self.queue.join_thread() 
     # make sure the consumers are done consuming the queue 
     for csmr in self.running: 
      #get everything on the results queue right now. 
      try: 
       while csmr.is_alive(): 
        self.results_list.append(
         self.results_queue.get(timeout=0.5)) 
        self.results_queue.task_done() 
      except queue.Empty: 
       if csmr.is_alive(): 
        logger.warn('Result queue empty but consumer alive.') 
        logger.warn('joining %s.' % csmr.name) 
        csmr.join() 
     del self.running[:] 
     if self.callback: 
      return self.callback(self.results_list) 
    else: 
     #read results immediately available. 
     try: 
      while True: 
       self.results_list.append(self.results_queue.get_nowait()) 
       self.results_queue.task_done() 
     except queue.Empty: 
      #got everything on the queue so far 
      pass 
    return self.results_list