2016-02-19 126 views
1

我有一個運行某個子進程的應用程序,並且我已成功設置主進程的sys.excepthook異常處理。現在,我想將它設置爲子流程的同一個鉤子。我希望它能夠簡單地複製我在主進程中使用的確切代碼行,就是這樣,但它不起作用。Python sys.excepthook只適用於主進程,但不適用於子進程

接下來是我的代碼:

class Consumer(multiprocessing.Process): 

     def __init__(self, codec_status_queue, logger_queue): 
      multiprocessing.Process.__init__(self) 
      self.codec_status_queue = codec_status_queue 
      self.logger_queue = logger_queue 

     def run(self): 
      # Set default unhandled exceptions handler 
      uncaughtErrorHandler = UncaughtErrorHandler(self.logger_queue) 
      sys.excepthook = uncaughtErrorHandler.error_handler 

      1/0 


    class UncaughtErrorHandler(object): 

     def __init__(self, logger_queue, child_processes=None): 
      self.logger_queue = logger_queue 
      self.child_processes = child_processes 

     def error_handler(self, type, value, trace_back): 
      trace_formatted = "".join(traceback.format_tb(trace_back)) 
      exeption_message = "Unhandled Exception:\n Type: %s\n Value: %s\n Line: %s\n Traceback:\n %s" % (type, value.message, trace_back.tb_lineno, trace_formatted) 
      logger_queue.put((LoggerThread.CRITICAL, exeption_message)) 

      if self.child_processes: 
       self.stop_children() 

      # Stopping this process 
      sys.exit() 

     def stop_children(self): 
      num_children = len(self.child_processes) 
      logger_queue.put((LoggerThread.DEBUG, "Terminating child processes (%s)" % num_children)) 
      for process in self.child_processes: 
       log_message = "Terminating %s with PID %s" % (process.name, process.pid) 
       logger_queue.put((LoggerThread.DEBUG, log_message)) 
       process.terminate() 


    if __name__ == '__main__': 
     ... 
     # Create processes and communication queues 
     codec_status_queue = multiprocessing.Queue() 

     num_consumers = multiprocessing.cpu_count() * 2 
     print 'Creating %d consumers' % num_consumers 
     consumers = [ Consumer(codec_status_queue, logger_queue) 
         for i in xrange(num_consumers) ] 

     # Set default unhandled exceptions handler 
     uncaughtErrorHandler = UncaughtErrorHandler(logger_queue, consumers) 
     sys.excepthook = uncaughtErrorHandler.error_handler 

     # Start processes 
     for consumer in consumers: 
      consumer.daemon = True 
      consumer.start() 

如果我把1/0__main__部分UncaughtErrorHandler捕獲異常,但是當如上圖所示1/0放,事實並非如此。

也許有人可以告訴我我做錯了什麼?

回答

1

以下代碼是爲Python 3.x編寫的,但可以改爲適用於Python 3.x。它提供了一個替代解決方案來覆蓋子進程中的sys.excepthook。一個簡單的修復包括捕獲所有異常並將數據從sys.exc_info移交給異常處理程序。主流程可以使用類似的模式作爲例外,但保留程序中的原始設計。下面顯示的例子應該是一個完整的工作演示,你可以玩弄並適應你的需求。

#! /usr/bin/env python3 
import logging 
import multiprocessing 
import queue 
import sys 
import threading 
import time 
import traceback 


def main(): 
    """Demonstrate exception handling and logging in several processes.""" 
    logger_queue = multiprocessing.Queue() 
    logger_thread = LoggerThread(logger_queue) 
    logger_thread.start() 
    try: 
     # Create processes and communication queues 
     codec_status_queue = multiprocessing.Queue() 
     num_consumers = multiprocessing.cpu_count() * 2 
     print('Creating {} consumers'.format(num_consumers)) 
     consumers = [Consumer(codec_status_queue, logger_queue) 
        for _ in range(num_consumers)] 
     # Set default unhandled exceptions handler 
     uncaught_error_handler = UncaughtErrorHandler(logger_queue, consumers) 
     sys.excepthook = uncaught_error_handler.error_handler 
     # Start processes 
     for consumer in consumers: 
      consumer.start() 
     time.sleep(2) 
    finally: 
     logger_thread.shutdown() 


def get_message(value): 
    """Retrieve an exception's error message and return it.""" 
    if hasattr(value, 'message'): 
     return value.message 
    if hasattr(value, 'args') and value.args: 
     return value.args[0] 


class LoggerThread(threading.Thread): 

    """Handle logging messages coming from various sources via a queue.""" 

    CRITICAL = logging.CRITICAL 
    DEBUG = logging.DEBUG 

    def __init__(self, logger_queue): 
     """Initialize an instance of the LoggerThread class.""" 
     super().__init__() 
     self.logger_queue = logger_queue 
     self.mutex = threading.Lock() 
     self.running = False 

    def run(self): 
     """Process messages coming through the queue until shutdown.""" 
     self.running = True 
     while self.running: 
      try: 
       while True: 
        self.handle_message(*self.logger_queue.get(True, 0.1)) 
      except queue.Empty: 
       pass 

    def handle_message(self, level, message): 
     """Show the message while ensuring a guaranteed order on screen.""" 
     with self.mutex: 
      print('Level:', level) 
      print('Message:', message) 
      print('=' * 80, flush=True) 

    def shutdown(self): 
     """Signal the thread to exit once it runs out of messages.""" 
     self.running = False 


class Consumer(multiprocessing.Process): 

    """Simulate a consumer process that handles data from a queue.""" 

    def __init__(self, codec_status_queue, logger_queue): 
     """Initialize an instance of the Consumer class.""" 
     super().__init__() 
     self.codec_status_queue = codec_status_queue 
     self.logger_queue = logger_queue 
     self.daemon = True 

    def run(self): 
     """Begin working as a consumer while handling any exceptions.""" 
     # Set default unhandled exceptions handler 
     uncaught_error_handler = UncaughtErrorHandler(self.logger_queue) 
     try: 
      self.do_consumer_work() 
     except: 
      uncaught_error_handler.error_handler(*sys.exc_info()) 

    def do_consumer_work(self): 
     """Pretend to be doing the work of a consumer.""" 
     junk = 1/0 
     print('Process', self.ident, 'calculated', junk) 


class UncaughtErrorHandler: 

    """Organize error handling to automatically terminate child processes.""" 

    def __init__(self, logger_queue, child_processes=None): 
     """Initialize an instance of the UncaughtErrorHandler class.""" 
     self.logger_queue = logger_queue 
     self.child_processes = child_processes 

    def error_handler(self, kind, value, trace_back): 
     """Record errors as they happen and terminate the process tree.""" 
     trace_formatted = ''.join(traceback.format_tb(trace_back)) 
     exception_message = ('Unhandled Exception:\n' 
          ' Type: {}\n' 
          ' Value: {}\n' 
          ' Line: {}\n' 
          ' Traceback:\n{}').format(
      kind, get_message(value), trace_back.tb_lineno, trace_formatted) 
     self.logger_queue.put((LoggerThread.CRITICAL, exception_message)) 
     if self.child_processes: 
      self.stop_children() 
     # Stopping this process 
     sys.exit() 

    def stop_children(self): 
     """Terminate all children associated with this error handler.""" 
     num_children = len(self.child_processes) 
     log_message = 'Terminating child processes({})'.format(num_children) 
     self.logger_queue.put((LoggerThread.DEBUG, log_message)) 
     for process in self.child_processes: 
      log_message = 'Terminating {} with PID {}'.format(
       process.name, process.pid) 
      self.logger_queue.put((LoggerThread.DEBUG, log_message)) 
      process.terminate() 


if __name__ == '__main__': 
    main() 
相關問題