2012-03-21 117 views
9

我有一個使用subprocess.Popen調用的可執行文件。然後,我打算通過stdin使用從隊列中讀取其值的線程爲它提供一些數據,該隊列稍後將填充到另一個線程中。應該使用另一個線程中的stdout管道讀取輸出,並再次將其排序到Queue中。python:讀取線程中的子進程輸出

據我以前的研究所瞭解,使用Queue線程是一種很好的做法。

不幸的是,外部可執行文件不會很快爲我提供每個輸入行的答案,因此簡單的寫入,讀取循環不是一個選項。可執行文件實現了一些內部的多線程,並且我希望輸出一旦可用,因此需要額外的讀取器線程。

至於測試的可執行文件將只是洗牌的每一行(shuffleline.py)的例子:

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

請注意,這已經是儘可能的緩衝。或者不是嗎?這是我的精簡測試程序:

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

這給下面的輸出(python2.7):

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

然後什麼2秒......

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

交織預計在一開始。但是子過程的輸出在子過程結束之後纔會出現,直到。隨着更多的線路輸入,我得到了一些輸出,因此我假設標準輸出管道存在緩存問題。根據在這裏發佈的其他問題,flush stdout(在子進程中)應該工作,至少在Linux上。

回答

7

您的問題無關的subprocess模塊,或線程(有問題的,因爲它們是),甚至是混合子進程和線程(一個非常糟糕的主意,不是使用線程更糟糕,開始時,除非你使用Python 3.2的子模塊的反向移植,你可以從code.google.com/p/python-subprocess32獲取)或者從多個線程訪問同樣的事情(如您的print報表做的。)

會發生什麼事是你shuffleline.py程序的緩衝區。不在輸出中,而在輸入中。雖然它不是很明顯,但是當您遍歷一個文件對象時,Python將以塊讀取,通常爲8k字節。由於sys.stdin是一個文件對象,你for循環將緩衝器中,直到EOF或完整塊:

for line in sys.stdin: 
    line = line.strip() 
    .... 

如果你想這樣做,無論是使用while循環調用sys.stdin.readline()(這對於EOF返回''):

while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

,或者使用的iter()兩個參數的形式,它創建調用的第一個參數,直到所述第二參數(「定點」)的迭代器被返回:

for line in iter(sys.stdin.readline, ''): 
    line = line.strip() 
    ... 

如果我不建議不使用線程,而是使用子進程的管道上的非阻塞I/O,或者像twisted.reactor.spawnProcess那樣有很多鉤住進程和其他東西的方法作爲消費者和生產者。

+0

謝謝,這是解決方案! – muckl 2012-03-21 21:33:49

+1

請問爲什麼子進程和線程的混合是如此糟糕的做法?看起來比在沒有任何事情發生時反覆調用非阻塞I/O更優雅。顯然這些線程不應該訪問任何非線程安全的數據結構,但是隻能從Queue讀寫數據似乎是安全的。 對於像我這樣簡單的情況,Python3.2 backport的變化是否很重要? – muckl 2012-03-21 21:41:24

+3

線程和子進程的問題具體是混合線程和分叉的問題。請參閱http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-the-和其他此類文章。 Python 3.2的子進程backport可以解決這些問題。至於一般的線程,主要問題是它們很難控制和調試。例如,你不能從線程的「外部」殺死它們,所以如果一個線程停留在讀取或寫入中,那麼你就無能爲力。 – 2012-03-21 22:29:08