我有一堆長時間運行的進程,我想分成多個進程。這部分我可以沒有問題。我碰到的問題有時這些進程進入掛起狀態。爲了解決這個問題,我希望能夠爲一個進程正在處理的每個任務設置一個時間閾值。當超過時間閾值時,我想重新啓動或終止任務。Python多重處理 - 終止/重啓工作進程
最初我的代碼非常簡單,使用一個進程池,但是對於池我無法弄清楚如何檢索池中的進程,從來不知道如何重新啓動/終止池中的進程。
我已轉向使用隊列和處理對象作爲在該示例(https://pymotw.com/2/multiprocessing/communication.html#passing-messages-to-processes與一些變化中示出。
我嘗試算出這個是在下面的代碼,在它的當前狀態的過程中沒有實際上終止了,除此之外,我無法弄清楚在當前任務結束後如何讓程序轉到下一個任務上,任何建議/幫助表示讚賞,或許我會以錯誤的方式回答這個問題
謝謝
import multiprocess
import time
class Consumer(multiprocess.Process):
def __init__(self, task_queue, result_queue, startTimes, name=None):
multiprocess.Process.__init__(self)
if name:
self.name = name
print 'created process: {0}'.format(self.name)
self.task_queue = task_queue
self.result_queue = result_queue
self.startTimes = startTimes
def stopProcess(self):
elapseTime = time.time() - self.startTimes[self.name]
print 'killing process {0} {1}'.format(self.name, elapseTime)
self.task_queue.cancel_join_thread()
self.terminate()
# now want to get the process to start procesing another job
def run(self):
'''
The process subclass calls this on a separate process.
'''
proc_name = self.name
print proc_name
while True:
# pulling the next task off the queue and starting it
# on the current process.
task = self.task_queue.get()
self.task_queue.cancel_join_thread()
if task is None:
# Poison pill means shutdown
#print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
self.startTimes[proc_name] = time.time()
answer = task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b, startTimes):
self.a = a
self.b = b
self.startTimes = startTimes
self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b)
def __call__(self):
import time
import os
print 'new job in process pid:', os.getpid(), self.taskName
if self.a == 2:
time.sleep(20000) # simulate a hung process
else:
time.sleep(3) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
# tasks = this is the work queue and results is for results or completed work
tasks = multiprocess.JoinableQueue()
results = multiprocess.Queue()
#parentPipe, childPipe = multiprocess.Pipe(duplex=True)
mgr = multiprocess.Manager()
startTimes = mgr.dict()
# Start consumers
numberOfProcesses = 4
processObjs = []
for processNumber in range(numberOfProcesses):
processObj = Consumer(tasks, results, startTimes)
processObjs.append(processObj)
for process in processObjs:
process.start()
# Enqueue jobs
num_jobs = 30
for i in range(num_jobs):
tasks.put(Task(i, i + 1, startTimes))
# Add a poison pill for each process object
for i in range(numberOfProcesses):
tasks.put(None)
# process monitor loop,
killProcesses = {}
executing = True
while executing:
allDead = True
for process in processObjs:
name = process.name
#status = consumer.status.getStatusString()
status = process.is_alive()
pid = process.ident
elapsedTime = 0
if name in startTimes:
elapsedTime = time.time() - startTimes[name]
if elapsedTime > 10:
process.stopProcess()
print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime)
if allDead and status:
allDead = False
if allDead:
executing = False
time.sleep(3)
# Wait for all of the tasks to finish
#tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
你可以減少你的代碼到[mcve]。目前它太複雜了。 –
閱讀關於[multiprocessing.html#synchronization-primitives](https://docs.python.org/3.6/library/multiprocessing.html#synchronization-primitives),Section _ ** multiprocessing.Event ** _。 – stovfl