2017-08-01 130 views
0

我有一堆長時間運行的進程,我想分成多個進程。這部分我可以沒有問題。我碰到的問題有時這些進程進入掛起狀態。爲了解決這個問題,我希望能夠爲一個進程正在處理的每個任務設置一個時間閾值。當超過時間閾值時,我想重新啓動或終止任務。Python多重處理 - 終止/重啓工作進程

最初我的代碼非常簡單,使用一個進程池,但是對於池我無法弄清楚如何檢索池中的進程,從來不知道如何重新啓動/終止池中的進程。

我已轉向使用隊列和處理對象作爲在該示例(https://pymotw.com/2/multiprocessing/communication.html#passing-messages-to-processes與一些變化中示出。

我嘗試算出這個是在下面的代碼,在它的當前狀態的過程中沒有實際上終止了,除此之外,我無法弄清楚在當前任務結束後如何讓程序轉到下一個任務上,任何建議/幫助表示讚賞,或許我會以錯誤的方式回答這個問題

謝謝

import multiprocess 
    import time 

    class Consumer(multiprocess.Process): 
     def __init__(self, task_queue, result_queue, startTimes, name=None): 
      multiprocess.Process.__init__(self) 
      if name: 
       self.name = name 
      print 'created process: {0}'.format(self.name) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.startTimes = startTimes 

     def stopProcess(self): 
      elapseTime = time.time() - self.startTimes[self.name] 
      print 'killing process {0} {1}'.format(self.name, elapseTime) 
      self.task_queue.cancel_join_thread() 
      self.terminate() 
      # now want to get the process to start procesing another job 

     def run(self): 
      ''' 
      The process subclass calls this on a separate process. 
      '''  
      proc_name = self.name 
      print proc_name 
      while True: 
       # pulling the next task off the queue and starting it 
       # on the current process. 
       task = self.task_queue.get() 
       self.task_queue.cancel_join_thread() 

       if task is None: 
        # Poison pill means shutdown 
        #print '%s: Exiting' % proc_name 
        self.task_queue.task_done() 
        break 
       self.startTimes[proc_name] = time.time() 
       answer = task() 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

    class Task(object): 
     def __init__(self, a, b, startTimes): 
      self.a = a 
      self.b = b 
      self.startTimes = startTimes 
      self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b) 

     def __call__(self): 
      import time 
      import os 

      print 'new job in process pid:', os.getpid(), self.taskName 

      if self.a == 2: 
       time.sleep(20000) # simulate a hung process 
      else: 
       time.sleep(3) # pretend to take some time to do the work 
      return '%s * %s = %s' % (self.a, self.b, self.a * self.b) 

     def __str__(self): 
      return '%s * %s' % (self.a, self.b) 

    if __name__ == '__main__': 
     # Establish communication queues 
     # tasks = this is the work queue and results is for results or completed work 
     tasks = multiprocess.JoinableQueue() 
     results = multiprocess.Queue() 

     #parentPipe, childPipe = multiprocess.Pipe(duplex=True) 
     mgr = multiprocess.Manager() 
     startTimes = mgr.dict() 

     # Start consumers 
     numberOfProcesses = 4 
     processObjs = [] 
     for processNumber in range(numberOfProcesses): 
      processObj = Consumer(tasks, results, startTimes) 
      processObjs.append(processObj) 

     for process in processObjs: 
      process.start() 

     # Enqueue jobs 
     num_jobs = 30 
     for i in range(num_jobs): 
      tasks.put(Task(i, i + 1, startTimes)) 

     # Add a poison pill for each process object 
     for i in range(numberOfProcesses): 
      tasks.put(None) 

     # process monitor loop, 
     killProcesses = {} 
     executing = True 
     while executing: 
      allDead = True 
      for process in processObjs: 
       name = process.name 
       #status = consumer.status.getStatusString() 
       status = process.is_alive() 
       pid = process.ident 
       elapsedTime = 0 
       if name in startTimes: 
        elapsedTime = time.time() - startTimes[name] 
       if elapsedTime > 10: 
        process.stopProcess() 

       print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime) 
       if allDead and status: 
        allDead = False 
      if allDead: 
       executing = False 
      time.sleep(3) 

     # Wait for all of the tasks to finish 
     #tasks.join() 

     # Start printing results 
     while num_jobs: 
      result = results.get() 
      print 'Result:', result 
      num_jobs -= 1 
+0

你可以減少你的代碼到[mcve]。目前它太複雜了。 –

+1

閱讀關於[multiprocessing.html#synchronization-primitives](https://docs.python.org/3.6/library/multiprocessing.html#synchronization-primitives),Section _ ** multiprocessing.Event ** _。 – stovfl

回答

1

我通常建議不要繼承multiprocessing.Process,因爲它會導致代碼難以閱讀。

我寧願將你的邏輯封裝在一個函數中,並在一個單獨的過程中運行它。這使代碼更加清潔和直觀。儘管如此,我建議您使用一些已經爲您解決問題的庫,例如Pebblebilliard,而不是重新發明輪子。

例如,Pebble庫允許將超時設置爲獨立運行的進程或在Pool內運行的進程。

一個超時一個單獨的進程中運行你的函數:

from pebble import concurrent 
from concurrent.futures import TimeoutError 

@concurrent.process(timeout=10) 
def function(foo, bar=0): 
    return foo + bar 

future = function(1, bar=2) 

try: 
    result = future.result() # blocks until results are ready 
except TimeoutError as error: 
    print("Function took longer than %d seconds" % error.args[1]) 

相同的例子,但有一個過程池。

with ProcessPool(max_workers=5, max_tasks=10) as pool: 
    future = pool.schedule(function, args=[1], timeout=10) 

    try: 
     result = future.result() # blocks until results are ready 
    except TimeoutError as error: 
     print("Function took longer than %d seconds" % error.args[1]) 

在這兩種情況下,超時過程都會自動終止。

2

一種更簡單的解決方案將是繼續使用而不是重新實現Pool是設計一種機制,使您正在運行的功能超時。 例如:

from time import sleep 
import signal 

class TimeoutError(Exception): 
    pass  

def handler(signum, frame): 
    raise TimeoutError() 

def run_with_timeout(func, *args, timeout=10, **kwargs): 
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout) 
    try: 
     res = func(*args, **kwargs) 
    except TimeoutError as exc: 
     print("Timeout") 
     res = exc 
    finally: 
     signal.alarm(0) 
    return res 


def test(): 
    sleep(4) 
    print("ok") 

if __name__ == "__main__": 
    import multiprocessing as mp 

    p = mp.Pool() 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout":1}).get()) 

signal.alarm設置超時,當超時,它運行處理程序,它阻止你的函數的執行。

編輯:如果您使用的是Windows系統,它似乎有點複雜,因爲signal不實施SIGALRM。另一個解決方案是使用C級python API。此代碼已從此SO answer改編,適用於64位系統。我只在linux上測試過,但它應該在Windows上工作。

import threading 
import ctypes 
from time import sleep 


class TimeoutError(Exception): 
    pass 


def run_with_timeout(func, *args, timeout=10, **kwargs): 
    interupt_tid = int(threading.get_ident()) 

    def interupt_thread(): 
     # Call the low level C python api using ctypes. tid must be converted 
     # to c_long to be valid. 
     res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
      ctypes.c_long(interupt_tid), ctypes.py_object(TimeoutError)) 
     if res == 0: 
      print(threading.enumerate()) 
      print(interupt_tid) 
      raise ValueError("invalid thread id") 
     elif res != 1: 
      # "if it returns a number greater than one, you're in trouble, 
      # and you should call it again with exc=NULL to revert the effect" 
      ctypes.pythonapi.PyThreadState_SetAsyncExc(
       ctypes.c_long(interupt_tid), 0) 
      raise SystemError("PyThreadState_SetAsyncExc failed") 

    timer = threading.Timer(timeout, interupt_thread) 
    try: 
     timer.start() 
     res = func(*args, **kwargs) 
    except TimeoutError as exc: 
     print("Timeout") 
     res = exc 
    else: 
     timer.cancel() 
    return res 


def test(): 
    sleep(4) 
    print("ok") 


if __name__ == "__main__": 
    import multiprocessing as mp 

    p = mp.Pool() 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout": 1}).get()) 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout": 5}).get()) 
+0

我喜歡這個解決方案,但不幸的是它不適用於我,因爲我必須能夠在windoze上運行此代碼!還有其他建議嗎? – Lafleur

+0

請參閱我的編輯第二個解決方案,該解決方案應該可以在Windows上工作。讓我知道這個是否奏效! –