2011-05-06 81 views
12

我很努力去理解什麼是解決這個簡單問題的Python方法。使用子進程管道大量的數據到標準輸入.Popen

我的問題很簡單。如果你使用下面的代碼,它會掛起。這在子流程模塊文檔中有詳細記錄。

import subprocess 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 
for i in range(100000): 
    proc.stdin.write('%d\n' % i) 
output = proc.communicate()[0] 
print output 

尋找解決的辦法(有一個非常有見地的線程,但我現在已經失去了它),我發現這個解決方案(等等),它使用一個明確的叉:

import os 
import sys 
from subprocess import Popen, PIPE 

def produce(to_sed): 
    for i in range(100000): 
     to_sed.write("%d\n" % i) 
     to_sed.flush() 
    #this would happen implicitly, anyway, but is here for the example 
    to_sed.close() 

def consume(from_sed): 
    while 1: 
     res = from_sed.readline() 
     if not res: 
      sys.exit(0) 
      #sys.exit(proc.poll()) 
     print 'received: ', [res] 

def main(): 
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE) 
    to_sed = proc.stdin 
    from_sed = proc.stdout 

    pid = os.fork() 
    if pid == 0 : 
     from_sed.close() 
     produce(to_sed) 
     return 
    else : 
     to_sed.close() 
     consume(from_sed) 

if __name__ == '__main__': 
    main() 

這枚解決方案在概念上非常容易理解,它使用了一個更多的進程,並且與子進程模塊相比,其級別太低(這只是爲了隱藏這種東西......)。

我想知道:是否有一個簡單和乾淨的解決方案,使用不會掛起或執行此模式的子進程模塊我必須退後一步,實現舊式選擇循環或顯式的叉?

謝謝

+1

您可以使用線程而不是叉(與非UNIX更好的兼容性,可以說更具可讀性),但除此之外,我認爲您給出的示例很好。選擇循環可能也會在一個線程中「複用」操作,但它不會比這更簡單。 – wump 2011-05-06 12:59:47

+0

使用'Popen.wait()'初步阻塞應該會造成死鎖(並掛起),但我使用了'Popen.communicate()'來擺脫這種情況。我認爲它使用了一些內部輪詢循環來將數據填充到緩衝區中。當你嘗試它時是否真的掛起,還是它需要很長時間才能運行? – 2011-05-06 13:05:22

+0

uhmmm ...由於子流程模塊是對低級流程管理的抽象,所以我很驚訝它不包含這個簡單的用戶案例。 – 2011-05-06 14:13:36

回答

8

如果您想要一個純粹的Python解決方案,您需要將閱讀器或書寫器放在單獨的線程中。 threading包是一個輕量級的方法,可以方便地訪問常見對象並且不會出現亂碼。

import subprocess 
import threading 
import sys 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 
def writer(): 
    for i in range(100000): 
     proc.stdin.write('%d\n' % i) 
    proc.stdin.close() 
thread = threading.Thread(target=writer) 
thread.start() 
for line in proc.stdout: 
    sys.stdout.write(line) 
thread.join() 
proc.wait() 

這可能是整齊地看到了subprocess模塊現代化,以支持流和協同程序,這將允許混合的Python件和外殼件要更優雅建造管道。

+0

以防萬一,這並非完全明顯:如果你不需要Python中的輸出;放下'stdout = PIPE',你不需要單獨的線程 - 你可以在同一個線程中寫入'proc.stdin'。不相關:即使在寫入時發生異常,也可以使用'with proc.stdin'來關閉它。 – jfs 2016-06-10 16:40:11

1

對於這種事情,shell比子進程工作得更好。

編寫非常簡單的Python應用程序,它從sys.stdin讀取並寫入sys.stdout

使用shell管道將簡單應用程序連接在一起。

如果需要,請使用subprocess啓動管道,或者直接編寫一行shell腳本。

python part1.py | python part2.py 

這非常非常有效。只要你保持它非常簡單,它也可以移植到所有Linux(和Windows)上。

+0

我知道有這樣做的1001方法。我要求的蟒蛇方式:)打電話給我一個純粹主義者:) – 2011-05-06 12:35:05

+0

@ user741720:我給你Pythonic解決方案。使用'sys.stdin'和'sys.stdout',避免不必要的複雜的'子流程'代碼。純粹的方法是儘可能少編寫代碼,並儘可能乾淨地編寫一點代碼。如果不在已經高度優化的操作系統代碼的中間插入額外的Python處理,操作系統可以做到最好(也是最快和最少的開銷)。 – 2011-05-06 12:38:15

0

下面是一個例子的使用管從gzip的一次讀取一個記錄(Python 3中):

cmd = 'gzip -dc compressed_file.gz' 
pipe = Popen(cmd, stdout=PIPE).stdout 

for line in pipe: 
    print(":", line.decode(), end="") 

我知道存在對於標準模塊,它只是意味着作爲示例。您可以使用溝通方法一次性讀取整個輸出結果(如shell back-ticks),但顯然您必須小心內存大小。

這裏是寫記錄到LP(1)程序的Linux中的示例(再次的Python 3):

cmd = 'lp -' 
proc = Popen(cmd, stdin=PIPE) 
proc.communicate(some_data.encode()) 
+0

這是您隨處可見的標準示例。重點是我不希望輸入來自另一個進程,我希望避免在將所有輸入發送給消費者之前將所有輸入寫入內存中......將所有內容全部傳遞給proc.communicate,當然可以解決問題... – 2011-05-06 14:08:37

0

現在我知道這是不會完全滿足你的純粹,作爲輸入必須適合內存,並且你沒有選擇與輸入輸出交互工作,但至少這對你的例子工作正常。溝通方法可以選擇將輸入作爲參數,如果以這種方式爲進程提供輸入,它將起作用。

import subprocess 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 

input = "".join('{0:d}\n'.format(i) for i in range(100000)) 
output = proc.communicate(input)[0] 
print output 

至於更大的問題,你也可以繼承POPEN,改寫__init__接受類似於流的對象作爲參數傳遞給標準輸入,標準輸出,標準錯誤,並重寫_communicate方法(多毛的跨平臺,你需要做的兩次,請參閱subprocess.py源文件)調用stdin流的read(),並將輸出寫入write()輸出到stdout和stderr流。這個方法讓我困擾的是,據我所知,它還沒有完成。當以前沒有做過明顯的事情時,通常有一個原因(它不能按預期工作),但我不明白爲什麼它不應該,除了需要在Windows中使用線程安全的事實。

4

如果你不想保留所有的數據在內存中,你必須使用select。例如。像這樣:

import subprocess 
from select import select 
import os 

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

i = 0; 
while True: 
    rlist, wlist, xlist = [proc.stdout], [], [] 
    if i < 100000: 
     wlist.append(proc.stdin) 
    rlist, wlist, xlist = select(rlist, wlist, xlist) 
    if proc.stdout in rlist: 
     out = os.read(proc.stdout.fileno(), 10) 
     print out, 
     if not out: 
      break 
    if proc.stdin in wlist: 
     proc.stdin.write('%d\n' % i) 
     i += 1 
     if i >= 100000: 
      proc.stdin.close() 
+0

是的,這將是概念上正確的解決方案。也許有點複雜,但是如果Popen沒有實現這些開箱即用的模式,這就是我實現它的方式...... – 2011-05-06 15:51:09

+2

我不認爲它實現了開箱即用,因爲通常當你需要求助於這個,你還需要對poll/select循環進行精細控制。你有沒有檢查['asyncore'](http://docs.python.org/library/asyncore.html)模塊? – 2011-05-06 16:14:00

+2

我發現了這個有趣的博客文章:http://dcreager.net/2009/08/13/subprocess-callbacks/ – 2011-05-06 16:24:20

2

這裏是我用來加載6G的mysql轉儲文件通過子進程加載的東西。遠離shell = True。不安全並從流程浪費資源開始。

import subprocess 

fhandle = None 

cmd = [mysql_path, 
     "-u", mysql_user, "-p" + mysql_pass], 
     "-h", host, database] 

fhandle = open(dump_file, 'r') 
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

(stdout,stderr) = p.communicate() 

fhandle.close() 
0

在Python 3.5使用aiofiles & ASYNCIO:

有點複雜,但你只需要1024字節的內存來寫標準輸入!

import asyncio 
import aiofiles 
import sys 
from os.path import dirname, join, abspath 
import subprocess as sb 


THIS_DIR = abspath(dirname(__file__)) 
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4') 
DEST_PATH = '/home/vahid/Desktop/sample.mp4' 


async def async_file_reader(f, buffer): 
    async for l in f: 
     if l: 
      buffer.append(l) 
     else: 
      break 
    print('reader done') 


async def async_file_writer(source_file, target_file): 
    length = 0 
    while True: 
     input_chunk = await source_file.read(1024) 
     if input_chunk: 
      length += len(input_chunk) 
      target_file.write(input_chunk) 
      await target_file.drain() 
     else: 
      target_file.write_eof() 
      break 

    print('writer done: %s' % length) 


async def main(): 
    dir_name = dirname(DEST_PATH) 
    remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH) 

    stdout, stderr = [], [] 
    async with aiofiles.open(SAMPLE_FILE, mode='rb') as f: 
     cmd = await asyncio.create_subprocess_shell(
      remote_cmd, 
      stdin=sb.PIPE, 
      stdout=sb.PIPE, 
      stderr=sb.PIPE, 
     ) 

     await asyncio.gather(*(
      async_file_reader(cmd.stdout, stdout), 
      async_file_reader(cmd.stderr, stderr), 
      async_file_writer(f, cmd.stdin) 
     )) 

     print('EXIT STATUS: %s' % await cmd.wait()) 

    stdout, stderr = '\n'.join(stdout), '\n'.join(stderr) 

    if stdout: 
     print(stdout) 

    if stderr: 
     print(stderr, file=sys.stderr) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

結果:

writer done: 383631 
reader done 
reader done 
EXIT STATUS: 0 
1

您的代碼死鎖只要cat的標準輸出OS管道緩衝區已滿。如果您使用stdout=PIPE;你必須及時使用它,否則可能會發生僵局。

如果您在進程運行時不需要輸出,您可以將其重定向到一個臨時文件:

#!/usr/bin/env python3 
import subprocess 
import tempfile 

with tempfile.TemporaryFile('r+') as output_file: 
    with subprocess.Popen(['cat'], 
          stdin=subprocess.PIPE, 
          stdout=output_file, 
          universal_newlines=True) as process: 
     for i in range(100000): 
      print(i, file=process.stdin) 
    output_file.seek(0) # rewind (and sync with the disk) 
    print(output_file.readline(), end='') # get the first line of the output 

如果輸入/輸出小(適合在內存中);你可以一次全部通過輸入並獲得一次性使用.communicate()讀取輸出/寫入的同時爲你:

#!/usr/bin/env python3 
import subprocess 

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]), 
        stdout=subprocess.PIPE, universal_newlines=True) 
print(cp.stdout.splitlines()[-1]) # print the last line 

要讀/手動併發寫,你可以使用線程,ASYNCIO,的fcntl等。 @Jed provided a simple thread-based solution。這裏的asyncio爲基礎的解決方案:

#!/usr/bin/env python3 
import asyncio 
import sys 
from subprocess import PIPE 

async def pump_input(writer): 
    try: 
     for i in range(100000): 
      writer.write(b'%d\n' % i) 
      await writer.drain() 
    finally: 
     writer.close() 

async def run(): 
    # start child process 
    # NOTE: universal_newlines parameter is not supported 
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE) 
    asyncio.ensure_future(pump_input(process.stdin)) # write input 
    async for line in process.stdout: # consume output 
     print(int(line)**2) # print squares 
    return await process.wait() # wait for the child process to exit 


if sys.platform.startswith('win'): 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 
loop.run_until_complete(run()) 
loop.close() 

在Unix上,你可以使用基於fcntl解決方案:

#!/usr/bin/env python3 
import sys 
from fcntl import fcntl, F_GETFL, F_SETFL 
from os import O_NONBLOCK 
from shutil import copyfileobj 
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF 

def make_blocking(pipe, blocking=True): 
    fd = pipe.fileno() 
    if not blocking: 
     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK 
    else: 
     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it 


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process: 
    make_blocking(process.stdout, blocking=False) 
    with process.stdin: 
     for i in range(100000): 
      #NOTE: the mode is block-buffered (default) and therefore 
      # `cat` won't see it immidiately 
      process.stdin.write(b'%d\n' % i) 
      # a deadblock may happen here with a *blocking* pipe 
      output = process.stdout.read(PIPE_BUF) 
      if output is not None: 
       sys.stdout.buffer.write(output) 
    # read the rest 
    make_blocking(process.stdout) 
    copyfileobj(process.stdout, sys.stdout.buffer) 
0

我能想到的最簡單的辦法:

from subprocess import Popen, PIPE 
from threading import Thread 

s = map(str,xrange(10000)) # a large string 
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1) 
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start() 
print (p.stdout.read()) 

緩衝版本:

from subprocess import Popen, PIPE 
from threading import Thread 

s = map(str,xrange(10000)) # a large string 
n = 1024 # buffer size 
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n) 
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start() 
print (p.stdout.read()) 
0

我正在尋找一個示例代碼來循序迭代過程輸出,因爲此過程從提供的迭代器中消耗其輸入(也是遞增的)。基本上是:

import string 
import random 

# That's what I consider a very useful function, though didn't 
# find any existing implementations. 
def process_line_reader(args, stdin_lines): 
    # args - command to run, same as subprocess.Popen 
    # stdin_lines - iterable with lines to send to process stdin 
    # returns - iterable with lines received from process stdout 
    pass 

# Returns iterable over n random strings. n is assumed to be infinity if negative. 
# Just an example of function that returns potentially unlimited number of lines. 
def random_lines(n, M=8): 
    while 0 != n: 
     yield "".join(random.choice(string.letters) for _ in range(M)) 
     if 0 < n: 
      n -= 1 

# That's what I consider to be a very convenient use case for 
# function proposed above. 
def print_many_uniq_numbered_random_lines(): 
    i = 0 
    for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)): 
     # Key idea here is that `process_line_reader` will feed random lines into 
     # `uniq` process stdin as lines are consumed from returned iterable. 
     print "#%i: %s" % (i, line) 
     i += 1 

這裏有些建議的解決方案的允許與線程這樣做(但它並不總是很方便)或ASYNCIO(這是不是可以在Python 2.x的)。以下是允許執行此操作的工作實施示例。

import subprocess 
import os 
import fcntl 
import select 

class nonblocking_io(object): 
    def __init__(self, f): 
     self._fd = -1 
     if type(f) is int: 
      self._fd = os.dup(f) 
      os.close(f) 
     elif type(f) is file: 
      self._fd = os.dup(f.fileno()) 
      f.close() 
     else: 
      raise TypeError("Only accept file objects or interger file descriptors") 
     flag = fcntl.fcntl(self._fd, fcntl.F_GETFL) 
     fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 
    def __enter__(self): 
     return self 
    def __exit__(self, type, value, traceback): 
     self.close() 
     return False 
    def fileno(self): 
     return self._fd 
    def close(self): 
     if 0 <= self._fd: 
      os.close(self._fd) 
      self._fd = -1 

class nonblocking_line_writer(nonblocking_io): 
    def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep): 
     super(nonblocking_line_writer, self).__init__(f) 
     self._lines = iter(lines) 
     self._lines_ended = False 
     self._autoclose = autoclose 
     self._buffer_size = buffer_size 
     self._buffer_offset = 0 
     self._buffer = bytearray() 
     self._encoding = encoding 
     self._linesep = bytearray(linesep, encoding) 
    # Returns False when `lines` iterable is exhausted and all pending data is written 
    def continue_writing(self): 
     while True: 
      if self._buffer_offset < len(self._buffer): 
       n = os.write(self._fd, self._buffer[self._buffer_offset:]) 
       self._buffer_offset += n 
       if self._buffer_offset < len(self._buffer): 
        return True 
      if self._lines_ended: 
       if self._autoclose: 
        self.close() 
       return False 
      self._buffer[:] = [] 
      self._buffer_offset = 0 
      while len(self._buffer) < self._buffer_size: 
       line = next(self._lines, None) 
       if line is None: 
        self._lines_ended = True 
        break 
       self._buffer.extend(bytearray(line, self._encoding)) 
       self._buffer.extend(self._linesep) 

class nonblocking_line_reader(nonblocking_io): 
    def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"): 
     super(nonblocking_line_reader, self).__init__(f) 
     self._autoclose = autoclose 
     self._buffer_size = buffer_size 
     self._encoding = encoding 
     self._file_ended = False 
     self._line_part = "" 
    # Returns (lines, more) tuple, where lines is iterable with lines read and more will 
    # be set to False after EOF. 
    def continue_reading(self): 
     lines = [] 
     while not self._file_ended: 
      data = os.read(self._fd, self._buffer_size) 
      if 0 == len(data): 
       self._file_ended = True 
       if self._autoclose: 
        self.close() 
       if 0 < len(self._line_part): 
        lines.append(self._line_part.decode(self._encoding)) 
        self._line_part = "" 
       break 
      for line in data.splitlines(True): 
       self._line_part += line 
       if self._line_part.endswith(("\n", "\r")): 
        lines.append(self._line_part.decode(self._encoding).rstrip("\n\r")) 
        self._line_part = "" 
      if len(data) < self._buffer_size: 
       break 
     return (lines, not self._file_ended) 

class process_line_reader(object): 
    def __init__(self, args, stdin_lines): 
     self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
     self._reader = nonblocking_line_reader(self._p.stdout) 
     self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines) 
     self._iterator = self._communicate() 
    def __iter__(self): 
     return self._iterator 
    def __enter__(self): 
     return self._iterator 
    def __exit__(self, type, value, traceback): 
     self.close() 
     return False 
    def _communicate(self): 
     read_set = [self._reader] 
     write_set = [self._writer] 
     while read_set or write_set: 
      try: 
       rlist, wlist, xlist = select.select(read_set, write_set, []) 
      except select.error, e: 
       if e.args[0] == errno.EINTR: 
        continue 
       raise 
      if self._reader in rlist: 
       stdout_lines, more = self._reader.continue_reading() 
       for line in stdout_lines: 
        yield line 
       if not more: 
        read_set.remove(self._reader) 
      if self._writer in wlist: 
       if not self._writer.continue_writing(): 
        write_set.remove(self._writer) 
     self.close() 
    def lines(self): 
     return self._iterator 
    def close(self): 
     if self._iterator is not None: 
      self._reader.close() 
      self._writer.close() 
      self._p.wait() 
      self._iterator = None 
相關問題