我正在使用multiprocessing.queues.JoinableQueue,如下所示:multiprocessing.Queue似乎消失了?操作系統(管道銷燬)與Python?
一個非常長時間運行的線程(多天)輪詢XML的API。這樣做的線程只是將XML解析爲對象,並將它們推送到隊列中。
對每個對象的處理比解析XML要花費更多的時間,並且絕不取決於從API讀取線程。因此,這種多處理的實現很簡單。
用於創建和清理過程的代碼是在這裏:
def queueAdd(self, item):
try:
self.queue.put(item)
except AssertionError:
#queue has been closed, remake it (let the other GC)
logger.warn('Queue closed early.')
self.queue = BufferQueue(ctx=multiprocessing.get_context())
self.queue.put(item)
except BrokenPipeError:
#workaround for pipe issue
logger.warn('Broken pipe, Forcing creation of new queue.')
# all reading procesess should suicide and new ones spawned.
self.queue = BufferQueue(ctx=multiprocessing.get_context())
# address = 'localhost'
# if address in multiprocessing.managers.BaseProxy._address_to_local:
# del BaseProxy._address_to_local[address][0].connection
self.queue.put(item)
except Exception as e:
#general thread exception.
logger.error('Buffer queue exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
# check for finished consumers and clean them up before we check to see
# if we need to add additional consumers.
for csmr in self.running:
if not csmr.is_alive():
debug('Child dead, releasing.')
self.running.remove(csmr)
#see if we should start a consumer...
# TODO: add min/max processes (default and override)
if not self.running:
debug('Spawning consumer.')
new_consumer = self.consumer(
queue=self.queue,
results_queue=self.results_queue,
response_error=self.response_error)
new_consumer.start()
self.running.append(new_consumer)
消費過程控制迴路是相當簡單的,以及:
def run(self):
'''Consumes the queue in the framework, passing off each item to the
ItemHandler method.
'''
while True:
try:
item = self.queue.get(timeout=3)
#the base class just logs this stuff
rval = self.singleItemHandler(item)
self.queue.task_done()
if rval and self.results_queue:
self.results_queue.put(rval)
except queue.Empty:
logging.debug('Queue timed out after 3 seconds.')
break
except EOFError:
logging.info(
'%s has finished consuming queue.' % (__class__.__name__))
break
except Exception as e:
#general thread exception.
self.logger.error('Consumer exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
一段時間後(約成功處理一小時),我收到一條日誌消息,指出消費者進程已由於超時DEBUG:root:Queue timed out after 3 seconds.
而死亡,但隊列仍處於打開狀態,顯然仍由原始線程寫入。線程似乎並不認爲消費者進程已經終止(請參閱queueAdd方法),並且不會嘗試啓動新進程。隊列看起來不是空的,只是讀取它似乎已超時。
我不知道爲什麼經理認爲孩子還活着。
編輯
我已經修改了原來的問題由於代碼更改爲如何BrokenPipeError記錄以及刪除斷開的連接清理。我認爲這是一個單獨的問題。
一邊仔細審查這個問題上,我意識到,我沒有在這裏列出的BrokenPipe異常記錄輸出:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach
我已經添加了額外的日誌記錄,如上面註釋中所述,但是在另一次測試沒有得到關於BrokenPipeError的日誌消息之後。 – SkyLeach