2013-03-03 54 views
1

我有以下代碼似乎可以工作,用於在逐行讀取/寫入(不使用communicate()預先使用communicate())的同時將子系統中的管道與python鏈接在一起。該代碼只是調用UNIX命令(mycmd),讀取其輸出,然後寫道,到另一個Unix命令(next_cmd)的標準輸入,而且最後一個命令的輸出重定向到一個文件中。逐行鏈接在Python中使用子流程編寫/讀取管道

# some unix command that uses a pipe: command "a" 
    # writes to stdout and "b" reads it and writes to stdout 
    mycmd = "a | b" 
    mycmd_proc = subprocess.Popen(mycmd, shell=True, 
            stdin=sys.stdin, 
            stdout=subprocess.PIPE, 
            stderr=subprocess.PIPE) 
    # nextCmd reads from stdin, and I'm passing it mycmd's output 
    next_cmd = "nextCmd -stdin" 
    output_file = open(output_filename, "w") 
    next_proc = subprocess.Popen(next_cmd, shell=True, 
            stdin=subprocess.PIPE, 
            stdout=output_file) 
    for line in iter(mycmd.stdout.readline, ''): 
     # do something with line 
     # ... 
     # write it to next command 
     next_proc.stdin.write(line) 
    ### If I wanted to call another command here that passes next_proc output 
    ### line by line to another command, would I need 
    ### to call next_proc.communicate() first? 
    next_proc.communicate() 
    output_file.close() 

這似乎工作,它只能在命令的末尾調用communicate()

我想擴展這個代碼再添加一個命令,所以你可以做:

mycmd1 | mycmd2 | mycmd3 > some_file 

含義:一行行,用Python讀mycmd1的輸出,過程線,將其提供給mycmd2,讀mycmd2的輸出和逐行過程中它和飼料它mycmd3這又使其輸出在some_file。這是可能的還是這種綁定在死鎖/阻塞/不刷新的緩衝區?請注意,我不只是將三個unix命令作爲管道調用,因爲我想在它們之間插入Python,並在將每個命令的輸出逐行後送到下一個命令之前逐行進行處理。

我想避免調用溝通和加載所有的輸出到內存中 - 而不是我要一行行解析它。謝謝。

+0

你看這個例子在手冊中? http://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline – zigg 2013-03-03 20:34:18

+0

@zigg:肯定,但它並沒有回答這個問題。我不只是製作一個管道,我想從一個管道讀取數據並寫入另一個管道,而不是隻調用發生在管道上的unix命令。對於我來說,瞭解在讀/寫塊或在什麼時候可以發生緩衝區是非常重要的。 – user248237dfsf 2013-03-03 20:42:44

+0

我的歉意;你是對的。我應該更仔細地閱讀你的問題。 – zigg 2013-03-03 22:50:26

回答

1

這應該處理命令任意數量的:

import sys 
import subprocess 

def processFirst(out): 
    return out 

def processSecond(out): 
    return out 

def processThird(out): 
    return out 

commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)] 

previous_output = None 
for cmd,process_func in commands: 
    if previous_output is None: 
     stdin = sys.stdin 
    else: 
     stdin = subprocess.PIPE 
    proc = subprocess.Popen(cmd, shell=True, 
          stdin = stdin, 
          stdout = subprocess.PIPE) 
    if previous_output is not None: 
     proc.stdin.write(previous_output) 

    out,err = proc.communicate() 
    out = process_func(out) 
    previous_output = out 

只需添加你要運行到與應處理其輸出的功能沿着命令列表中的任何命令。最後一個命令的輸出最終會在循環結束時位於previous_output

爲了避免任何死鎖/緩衝等問題,您只需使用proc.communicate()即可完成每個命令的完成,它將返回輸出(而不是像您的示例中那樣直接讀取)。然後,在讓它運行到完成之前,將它提供給下一個命令,等等。

編輯:剛纔注意到你不想使用communicate()前期和您希望通過線上線作出反應。我將稍微編輯我的答案以解決這個問題

This answer提供了一個示例,說明如何使用select.select()而不用阻塞從管道逐行讀取。

下面是使用它爲您的特定情況下,例如:

import sys 
import subprocess 
import select 
import os 

class LineReader(object): 
    def __init__(self, fd, process_func): 
     self._fd = fd 
     self._buf = '' 
     self._process_func = process_func 
     self.next_proc = None 

    def fileno(self): 
     return self._fd 

    def readlines(self): 
     data = os.read(self._fd, 4096) 
     if not data: 
      # EOF 
      if self.next_proc is not None: 
       self.next_proc.stdin.close() 
      return None 
     self._buf += data 
     if '\n' not in data: 
      return [] 
     tmp = self._buf.split('\n') 
     tmp_lines, self._buf = tmp[:-1], tmp[-1] 
     lines = [] 
     for line in tmp_lines: 
      lines.append(self._process_func(line)) 
      if self.next_proc is not None: 
       self.next_proc.stdin.write("%s\n" % lines[-1]) 

     return lines 

def processFirst(line): 
    return line 

def processSecond(line): 
    return line 

def processThird(line): 
    return line 

commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)] 

readers = [] 
previous_reader = None 
for cmd,process_func in commands: 
    if previous_reader is None: 
     stdin = sys.stdin 
    else: 
     stdin = subprocess.PIPE 
    proc = subprocess.Popen(cmd, shell=True, 
          stdin = stdin, 
          stdout = subprocess.PIPE) 

    if previous_reader is not None: 
     previous_reader.next_proc = proc 

    previous_reader = LineReader(proc.stdout.fileno(), process_func) 
    readers.append(previous_reader) 


while readers: 
    ready,_,_ = select.select(readers, [], [], 10.0) 
    for stream in ready: 
     lines = stream.readlines() 
     if lines is None: 
      readers.remove(stream)