2010-08-26 75 views
10

我想執行一個進程,限制執行時間以秒爲單位的超時並獲取進程產生的輸出。我想在windows,linux和freebsd上執行此操作。蟒蛇子進程超時和大輸出(> 64K)

我曾嘗試三種不同的方式實現這一點:

  1. CMD - 沒有超時和subprocess.PIPE輸出捕獲。

    行爲:操作與預期,但不支持超時,我需要超時...

  2. cmd_to - 隨着超時和subprocess.PIPE輸出捕獲。

    行爲:輸出> = 2^16字節時阻止子進程執行。

  3. cmd_totf - 用於輸出捕獲的超時和tempfile.NamedTemporaryfile。

    行爲:按預期方式運行,但使用磁盤上的臨時文件。

這些可以在下面查看。

從下面的輸出中可以看出,當使用子處理時,超時碼會阻止子進程的執行.PIPE和子進程的輸出大於等於2^16字節。

子進程文檔指出,在調用process.wait()和使用subprocessing.PIPE時,這是預期的,但是在使用process.poll()時沒有給出警告,那麼這裏出了什麼問題?

我有一個在cmd_totf中使用tempfile模塊的解決方案,但是權衡是它將輸出寫入磁盤,這是我真的很想避免的。

所以我的問題是:

  • 我是什麼在cmd_to做錯了嗎?
  • 有沒有辦法做到我想要的,而不使用臨時文件/保持輸出內存。

腳本來生成一束輸出( 'exp_gen.py')的:包裝紙

#!/usr/bin/env python 
import sys 
output = "b"*int(sys.argv[1]) 
print output 

三種不同的實施方式(CMD,cmd_to,cmd_totf)周圍subprocessing.Popen:

#!/usr/bin/env python 
import subprocess, time, tempfile 
bufsize = -1 

def cmd(cmdline, timeout=60): 
    """ 
    Execute cmdline. 
    Uses subprocessing and subprocess.PIPE. 
    """ 

    p = subprocess.Popen(
    cmdline, 
    bufsize = bufsize, 
    shell = False, 
    stdin = subprocess.PIPE, 
    stdout = subprocess.PIPE, 
    stderr = subprocess.PIPE 
) 

    out, err = p.communicate() 
    returncode = p.returncode 

    return (returncode, err, out) 

def cmd_to(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses subprocessing and subprocess.PIPE. 
    """ 

    p = subprocess.Popen(
    cmdline, 
    bufsize = bufsize, 
    shell = False, 
    stdin = subprocess.PIPE, 
    stdout = subprocess.PIPE, 
    stderr = subprocess.PIPE 
) 

    t_begin   = time.time()    # Monitor execution time 
    seconds_passed = 0 

    while p.poll() is None and seconds_passed < timeout: 
    seconds_passed = time.time() - t_begin 
    time.sleep(0.1) 

    #if seconds_passed > timeout: 
    # 
    # try: 
    # p.stdout.close() # If they are not closed the fds will hang around until 
    # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
    # p.terminate()  # Important to close the fds prior to terminating the process! 
    #      # NOTE: Are there any other "non-freed" resources? 
    # except: 
    # pass 
    # 
    # raise TimeoutInterrupt 

    out, err = p.communicate() 
    returncode = p.returncode 

    return (returncode, err, out) 

def cmd_totf(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses subprocessing and tempfile instead of subprocessing.PIPE. 
    """ 

    output = tempfile.NamedTemporaryFile(delete=False) 
    error = tempfile.NamedTemporaryFile(delete=False) 

    p = subprocess.Popen(
    cmdline, 
    bufsize = 0, 
    shell = False, 
    stdin = None, 
    stdout = output, 
    stderr = error 
) 

    t_begin   = time.time()    # Monitor execution time 
    seconds_passed = 0 

    while p.poll() is None and seconds_passed < timeout: 
    seconds_passed = time.time() - t_begin 
    time.sleep(0.1) 

    #if seconds_passed > timeout: 
    # 
    # try: 
    # p.stdout.close() # If they are not closed the fds will hang around until 
    # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
    # p.terminate()  # Important to close the fds prior to terminating the process! 
    #      # NOTE: Are there any other "non-freed" resources? 
    # except: 
    # pass 
    # 
    # raise TimeoutInterrupt 

    p.wait() 

    returncode = p.returncode 

    fd   = open(output.name) 
    out   = fd.read() 
    fd.close() 

    fd = open(error.name) 
    err = fd.read() 
    fd.close() 

    error.close() 
    output.close() 

    return (returncode, err, out) 

if __name__ == "__main__": 

    implementations = [cmd, cmd_to, cmd_totf] 
    bytes  = ['65535', '65536', str(1024*1024)] 
    timeouts = [5] 

    for timeout in timeouts:  
    for size in bytes:  
     for i in implementations: 
     t_begin   = time.time() 
     seconds_passed = 0   
     rc, err, output = i(['exp_gen.py', size], timeout) 
     seconds_passed = time.time() - t_begin 
     filler = ' '*(8-len(i.func_name)) 
     print "[%s%s: timeout=%d, iosize=%s, seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed) 
從執行

輸出:

['cmd'  : timeout=5, iosize=65535, seconds=0.016447] 
['cmd_to' : timeout=5, iosize=65535, seconds=0.103022] 
['cmd_totf': timeout=5, iosize=65535, seconds=0.107176] 
['cmd'  : timeout=5, iosize=65536, seconds=0.028105] 
['cmd_to' : timeout=5, iosize=65536, seconds=5.116658] 
['cmd_totf': timeout=5, iosize=65536, seconds=0.104905] 
['cmd'  : timeout=5, iosize=1048576, seconds=0.025964] 
['cmd_to' : timeout=5, iosize=1048576, seconds=5.128062] 
['cmd_totf': timeout=5, iosize=1048576, seconds=0.103183] 
+0

試試從http://stackoverflow.com/questions/874815/how-do-i-get-real-time-information-back-from-a-subprocess-popen-in-python-2-5回答。 – 2010-12-04 21:05:31

+0

你應該提到哪個版本的python。由於AFAIK,有相當多的變化從2.6到2.7相關的'subprocess'模塊 – 2011-12-12 04:19:49

+0

也參見http://stackoverflow.com/questions/1191374/subprocess-with-timeout/8507775#8507775 – bortzmeyer 2011-12-14 16:15:09

回答

4

與之相對所有子過程文檔中的警告然後直接從process.stdout和process.stderr讀取提供了更好的解決方案。

更好地說,我的意思是我可以從超過2^16字節的進程中讀取輸出,而不必臨時將輸出存儲在磁盤上。

的代碼如下:

import fcntl 
import os 
import subprocess 
import time 

def nonBlockRead(output): 
    fd = output.fileno() 
    fl = fcntl.fcntl(fd, fcntl.F_GETFL) 
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 
    try: 
     return output.read() 
    except: 
     return '' 

def cmd(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses the subprocess module and subprocess.PIPE. 

    Raises TimeoutInterrupt 
    """ 

    p = subprocess.Popen(
     cmdline, 
     bufsize = bufsize, # default value of 0 (unbuffered) is best 
     shell = False, # not really needed; it's disabled by default 
     stdout = subprocess.PIPE, 
     stderr = subprocess.PIPE 
    ) 

    t_begin = time.time() # Monitor execution time 
    seconds_passed = 0 

    stdout = '' 
    stderr = '' 

    while p.poll() is None and seconds_passed < timeout: # Monitor process 
     time.sleep(0.1) # Wait a little 
     seconds_passed = time.time() - t_begin 

     # p.std* blocks on read(), which messes up the timeout timer. 
     # To fix this, we use a nonblocking read() 
     # Note: Not sure if this is Windows compatible 
     stdout += nonBlockRead(p.stdout) 
     stderr += nonBlockRead(p.stderr) 

    if seconds_passed >= timeout: 
     try: 
      p.stdout.close() # If they are not closed the fds will hang around until 
      p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
      p.terminate()  # Important to close the fds prior to terminating the process! 
           # NOTE: Are there any other "non-freed" resources? 
     except: 
      pass 

     raise TimeoutInterrupt 

    returncode = p.returncode 

    return (returncode, stdout, stderr) 
+0

這很好,但是如果沒有輸出要讀取,就會阻塞讀取,這會弄亂定時器。我已經修復了它的版本,並添加了一個編輯。 – 2011-12-11 08:37:31

+0

@JohnDoe:[''fcn​​tl'在Windows上不起作用](http://stackoverflow.com/q/375427/4279) – jfs 2014-02-26 07:17:11

1

免責聲明:這個答案不是Windows測試,也沒有FreeBSD的。但使用的模塊應該在這些系統上工作。我相信這應該是對你的問題的一個有效的答案 - 它適用於我。

這裏是我剛剛砍過的解決linux上的問題的代碼。它是幾個Stackoverflow線程和我自己在Python 3文檔中的研究的組合。

此代碼的主要特點:

  • 使用進程不線程阻塞I/O,因爲它們可以更可靠地p.terminated()
  • 實現一個可再觸發的超時監視該重新開始計數,每當一些輸出發生
  • 實現長期超時看門狗限制整體運行
  • 可以在標準輸入飼料(雖然我只需要一次性喂短字符串)
  • 可以在通常的Popen中捕獲stdout/stderr方法(只有stdout被編碼,而stderr被重定向到stdout;但可以很容易地分開)
  • 這幾乎是實時的,因爲它只會每0.2秒檢查一次輸出。但是你可以減少這個或者輕鬆地刪除等待間隔
  • 許多調試打印輸出仍然可以看到什麼時候發生。

唯一的代碼依賴項是enum,實現爲here,但代碼很容易被修改爲無需工作。它僅用於區分兩個超時 - 如果您願意,可以使用單獨的例外。

下面的代碼 - 像往常一樣 - 反饋的高度讚賞: (編輯6月29日2012 - 代碼現在實際工作)

# Python module runcmd 
# Implements a class to launch shell commands which 
# are killed after a timeout. Timeouts can be reset 
# after each line of output 
# 
# Use inside other script with: 
# 
# import runcmd 
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'], 
#         timeout_runtime, 
#         timeout_no_output, 
#         stdin_string).go() 
# 

import multiprocessing 
import queue 
import subprocess 
import time 

import enum 

def timestamp(): 
    return time.strftime('%Y%m%d-%H%M%S') 


class ErrorRunCmd(Exception): pass 
class ErrorRunCmdTimeOut(ErrorRunCmd): pass 

class Enqueue_output(multiprocessing.Process): 
    def __init__(self, out, queue): 
     multiprocessing.Process.__init__(self) 
     self.out = out 
     self.queue = queue 
     self.daemon = True 
    def run(self): 
     try: 
      for line in iter(self.out.readline, b''): 
       #print('worker read:', line) 
       self.queue.put(line) 
     except ValueError: pass # Readline of closed file 
     self.out.close() 
class Enqueue_input(multiprocessing.Process): 
    def __init__(self, inp, iterable): 
     multiprocessing.Process.__init__(self) 
     self.inp = inp 
     self.iterable = iterable 
     self.daemon = True 
    def run(self): 
     #print("writing stdin") 
     for line in self.iterable: 
      self.inp.write(bytes(line,'utf-8')) 
     self.inp.close() 
     #print("writing stdin DONE") 

class RunCmd(): 
    """RunCmd - class to launch shell commands 

    Captures and returns stdout. Kills child after a given 
    amount (timeout_runtime) wallclock seconds. Can also 
    kill after timeout_retriggerable wallclock seconds. 
    This second timer is reset whenever the child does some 
    output 

     (return_code, out) = RunCmd(['ls', '-l', '/etc'], 
            timeout_runtime, 
            timeout_no_output, 
            stdin_string).go() 

    """ 
    Timeout = enum.Enum('No','Retriggerable','Runtime') 

    def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None): 
     self.dbg = False 
     self.cmd = cmd 
     self.timeout_retriggerable = timeout_retriggerable 
     self.timeout_runtime = timeout_runtime 
     self.timeout_hit = self.Timeout.No 
     self.stdout = '--Cmd did not yield any output--' 
     self.stdin = stdin 
    def read_queue(self, q): 
     time_last_output = None 
     try: 
      bstr = q.get(False) # non-blocking 
      if self.dbg: print('{} chars read'.format(len(bstr))) 
      time_last_output = time.time() 
      self.stdout += bstr 
     except queue.Empty: 
      #print('queue empty') 
      pass 
     return time_last_output 
    def go(self): 
     if self.stdin: 
      pstdin = subprocess.PIPE 
     else: 
      pstdin = None 
     p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin) 
     pin = None 
     if (pstdin): 
      pin = Enqueue_input(p.stdin, [self.stdin + '\n']) 
      pin.start() 
     q = multiprocessing.Queue() 
     pout = Enqueue_output(p.stdout, q) 
     pout.start() 
     try: 
      if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime())) 
      time_begin = time.time() 
      time_last_output = time_begin 
      seconds_passed = 0 
      self.stdout = b'' 
      once = True     # ensure loop's executed at least once 
             # some child cmds may exit very fast, but still produce output 
      while once or p.poll() is None or not q.empty(): 
       once = False 
       if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout))) 

       tlo = self.read_queue(q) 
       if tlo: 
        time_last_output = tlo 

       now = time.time() 
       if now - time_last_output >= self.timeout_retriggerable: 
        self.timeout_hit = self.Timeout.Retriggerable 
        raise ErrorRunCmdTimeOut(self) 
       if now - time_begin >= self.timeout_runtime: 
        self.timeout_hit = self.Timeout.Runtime 
        raise ErrorRunCmdTimeOut(self) 

       if q.empty(): 
        time.sleep(0.1) 
      # Final try to get "last-millisecond" output 
      self.read_queue(q)    
     finally: 
      self._close(p, [pout, pin])    
     return (self.returncode, self.stdout)    

    def _close(self, p, procs): 
     if self.dbg: 
      if self.timeout_hit != self.Timeout.No: 
       print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit)) 
      else: 
       print('{} No timeout occured'.format(timestamp())) 
     for process in [proc for proc in procs if proc]: 
      try: 
       process.terminate() 
      except: 
       print('{} Process termination raised trouble'.format(timestamp())) 
       raise 
     try: 
      p.stdin.close() 
     except: pass 
     if self.dbg: print('{} _closed stdin'.format(timestamp())) 
     try: 
      p.stdout.close() # If they are not closed the fds will hang around until 
     except: pass 
     if self.dbg: print('{} _closed stdout'.format(timestamp())) 
      #p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
     try: 
      p.terminate()  # Important to close the fds prior to terminating the process! 
           # NOTE: Are there any other "non-freed" resources? 
     except: pass 
     if self.dbg: print('{} _closed Popen'.format(timestamp())) 
     try: 
      self.stdout = self.stdout.decode('utf-8') 
     except: pass 
     self.returncode = p.returncode 
     if self.dbg: print('{} _closed all'.format(timestamp())) 

使用帶:

import runcmd 

cmd = ['ls', '-l', '/etc'] 

worker = runcmd.RunCmd(cmd, 
         40, # limit runtime [wallclock seconds] 
         2,  # limit runtime after last output [wallclk secs] 
         ''  # stdin input string 
         ) 
(return_code, out) = worker.go() 

if worker.timeout_hit != worker.Timeout.No: 
    print('A TIMEOUT occured: {}'.format(worker.timeout_hit)) 
else: 
    print('No timeout occured') 


print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out))) 
print('Output:') 
print(out) 

command - 第一個參數 - 應該是一個命令及其參數的列表。它用於呼叫Popen(shell=False),其超時時間在幾秒鐘內。目前沒有禁用超時的代碼。將timeout_no_output設置爲time_runtime以有效禁用可重觸發的timeout_no_outputstdin_string可以是要發送到命令的標準輸入的任何字符串。如果您的命令不需要任何輸入,則設置爲None。如果提供了一個字符串,最後的'\ n'被追加。