2017-08-27 48 views
1

我有幾個進程是爲了在while循環中運行。我基本上有一些進程收集數據,並且在他們停止之前,我希望他們將數據保存到csv或json文件中。我現在所擁有的就是使用超級函數來覆蓋multiprocessing.Process類中的join方法。當我調用process.join時,如何運行腳本?

class Processor(multiprocessing.Process): 
    def __init__(self, arguments): 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     self.main_function() 

    def main_function(self): 
     While True: 
      #do things to incoming data 

    def function_on_join(self): 
     #do one last thing before the process ends 

    def join(self, timeout=None): 
     self.function_on_join() 
     super(Processor, self).join(timeout=timeout) 

有沒有更好的方法/正確的方法/更pythonic的方式來做到這一點?

回答

1

我建議你看看concurrent.futures模塊。

如果您可以將您的工作描述爲由一羣工作人員完成的任務列表。

任務型多處理

當你有jobs(文件名如列表)的順序,你希望他們能夠並行處理 - 你可以這樣做如下:

from concurrent.futures import ProcessPoolExecutor  
import requests 

def get_url(url): 
    resp = requests.get(url) 
    print(f'{url} - {resp.status_code}') 
    return url 

jobs = ['http://google.com', 'http://python.org', 'http://facebook.com'] 

# create process pool of 3 workers 
with ProcessPoolExecutor(max_workers=1) as pool: 
    # run in parallel each job and gather the returned values 
    return_values = list(pool.map(get_url, jobs)) 

print(return_values) 

輸出:

http://google.com - 200 
http://python.org - 200 
http://facebook.com - 200 
['http://google.com', 'http://python.org', 'http://facebook.com'] 

不是基於任務的多重

當您只想運行多個不消耗第一種情況的作業的子流程時,可能需要使用multiprocessing.Process

您可以以類似於threading.Thread的程序方式和OOP方式使用它。

示例程序時尚(恕我直言更Python):

import os 
from multiprocessing import Process 

def func(): 
    print(f'hello from: {os.getpid()}') 

processes = [Process(target=func) for _ in range(4)] # creates 4 processes 

for process in processes: 
    process.daemon = True # close the subprocess if the main program closes 
    process.start() # start the process 

輸出:

hello from: 31821 
hello from: 31822 
hello from: 31823 
hello from: 31824 

等待進程完成

,如果你想使用Process.join()等待(更信息process.join() & process.daemonthis SO answer),你可以做這樣的:

import os 
import time 
from multiprocessing import Process 

def func(): 
    time.sleep(3) 
    print(f'hello from: {os.getpid()}') 

processes = [Process(target=func) for _ in range(4)] # creates 4 processes 

for process in processes: 
    process.start() # start the process 

for process in processes: 
    process.join() # wait for the process to finish 

print('all processes are done!') 

此輸出:

hello from: 31980 
hello from: 31983 
hello from: 31981 
hello from: 31982 
all processes are done! 
相關問題