我有一個使用subprocess.Popen調用的可執行文件。然後,我打算通過stdin使用從隊列中讀取其值的線程爲它提供一些數據,該隊列稍後將填充到另一個線程中。應該使用另一個線程中的stdout管道讀取輸出,並再次將其排序到Queue中。python:讀取線程中的子進程輸出
據我以前的研究所瞭解,使用Queue線程是一種很好的做法。
不幸的是,外部可執行文件不會很快爲我提供每個輸入行的答案,因此簡單的寫入,讀取循環不是一個選項。可執行文件實現了一些內部的多線程,並且我希望輸出一旦可用,因此需要額外的讀取器線程。
至於測試的可執行文件將只是洗牌的每一行(shuffleline.py)的例子:
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
請注意,這已經是儘可能的緩衝。或者不是嗎?這是我的精簡測試程序:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
這給下面的輸出(python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
然後什麼2秒......
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
交織預計在一開始。但是子過程的輸出在子過程結束之後纔會出現,直到。隨着更多的線路輸入,我得到了一些輸出,因此我假設標準輸出管道存在緩存問題。根據在這裏發佈的其他問題,flush stdout(在子進程中)應該工作,至少在Linux上。
謝謝,這是解決方案! – muckl 2012-03-21 21:33:49
請問爲什麼子進程和線程的混合是如此糟糕的做法?看起來比在沒有任何事情發生時反覆調用非阻塞I/O更優雅。顯然這些線程不應該訪問任何非線程安全的數據結構,但是隻能從Queue讀寫數據似乎是安全的。 對於像我這樣簡單的情況,Python3.2 backport的變化是否很重要? – muckl 2012-03-21 21:41:24
線程和子進程的問題具體是混合線程和分叉的問題。請參閱http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-the-和其他此類文章。 Python 3.2的子進程backport可以解決這些問題。至於一般的線程,主要問題是它們很難控制和調試。例如,你不能從線程的「外部」殺死它們,所以如果一個線程停留在讀取或寫入中,那麼你就無能爲力。 – 2012-03-21 22:29:08