2016-09-07 46 views
13

函數glib.spawn_async允許您掛鉤在stdoutstderr上調用事件的三個​​回調,以及進程完成時。使用Popen模擬glib.spawn_async ...

如何使用subprocess與線程或asyncio模仿相同的功能?

我更感興趣的功能,而不是線程/ asynio但包含兩個答案將獲得賞金。

這裏是一個玩具程序,它顯示了我想做的事:

import glib 
import logging 
import os 
import gtk 


class MySpawn(object): 
    def __init__(self): 
     self._logger = logging.getLogger(self.__class__.__name__) 

    def execute(self, cmd, on_done, on_stdout, on_stderr): 
     self.pid, self.idin, self.idout, self.iderr = \ 
      glib.spawn_async(cmd, 
          flags=glib.SPAWN_DO_NOT_REAP_CHILD, 
          standard_output=True, 
          standard_error=True) 
     fout = os.fdopen(self.idout, "r") 
     ferr = os.fdopen(self.iderr, "r") 
     glib.child_watch_add(self.pid, on_done) 
     glib.io_add_watch(fout, glib.IO_IN, on_stdout) 
     glib.io_add_watch(ferr, glib.IO_IN, on_stderr) 
     return self.pid 


if __name__ == '__main__': 
    logging.basicConfig(format='%(thread)d %(levelname)s: %(message)s', 
         level=logging.DEBUG) 
    cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split() 

    def on_done(pid, retval, *args): 
     logging.info("That's all folks!…") 

    def on_stdout(fobj, cond): 
     """This blocks which is fine for this toy example…""" 
     for line in fobj.readlines(): 
      logging.info(line.strip()) 
     return True 

    def on_stderr(fobj, cond): 
     """This blocks which is fine for this toy example…""" 
     for line in fobj.readlines(): 
      logging.error(line.strip()) 
     return True 

    runner = MySpawn() 
    runner.execute(cmd, on_done, on_stdout, on_stderr) 
    try: 
     gtk.main() 
    except KeyboardInterrupt: 
     print('') 

我要補充的是,由於readlines()擋住,上面會緩衝所有的輸出,並立刻發送。如果這不是您想要的,那麼您必須使用readline(),並確保在命令結束時讀完您之前未讀過的所有行。

回答

4

ASYNCIO有subprocess_exec,就沒有必要使用子模塊都:

import asyncio 

class Handler(asyncio.SubprocessProtocol): 
    def pipe_data_received(self, fd, data): 
     # fd == 1 for stdout, and 2 for stderr 
     print("Data from /bin/ls on fd %d: %s" % (fd, data.decode())) 

    def pipe_connection_lost(self, fd, exc): 
     print("Connection lost to /bin/ls") 

    def process_exited(self): 
     print("/bin/ls is finished.") 

loop = asyncio.get_event_loop() 
coro = loop.subprocess_exec(Handler, "/bin/ls", "/") 

loop.run_until_complete(coro) 
loop.close() 

隨着子和線程,這是很簡單。你可以只產卵每管螺紋,一到wait()的過程:

import subprocess 
import threading 

class PopenWrapper(object): 
    def __init__(self, args): 
     self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL) 

     self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,)) 
     self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,)) 
     self.exit_watcher = threading.Thread(target=self._exit_watcher) 

     self.stdout_reader_thread.start() 
     self.stderr_reader_thread.start() 
     self.exit_watcher.start() 

    def _reader(self, fileobj): 
     for line in fileobj: 
      self.on_data(fileobj, line) 

    def _exit_watcher(self): 
     self.process.wait() 
     self.stdout_reader_thread.join() 
     self.stderr_reader_thread.join() 
     self.on_exit() 

    def on_data(self, fd, data): 
     return NotImplementedError 

    def on_exit(self): 
     return NotImplementedError 

    def join(self): 
     self.process.wait() 

class LsWrapper(PopenWrapper): 
    def on_data(self, fd, data): 
     print("Received on fd %r: %s" % (fd, data)) 

    def on_exit(self): 
     print("Process exited.") 


LsWrapper(["/bin/ls", "/"]).join() 

但是,記住,油嘴不使用線程來執行asynchroneously您的回調。它使用事件循環,就像asyncio一樣。這個想法是,程序的核心是一個等待事件發生的循環,然後同步執行相關的回調。就你而言,這就是「數據可用於讀取其中一個管道」,並且「子進程已退出」。一般來說,它也有類似「X11服務器報告的鼠標移動」,「存在傳入的網絡流量」等。您可以通過編寫自己的事件循環來模擬glib的行爲。在兩個管道上使用select module。如果選擇報告管道可讀,但read不返回任何數據,則可能退出該過程 - 在這種情況下,調用子進程對象上的poll()方法來檢查它是否完成,並在出現回調(如果有)或錯誤否則回撥。

+0

非常感謝您花時間寫這個答案。 – Sardathrion

+1

請注意,上面的代碼會將'stdout'和'stderr'中的行緩存爲readlines()被阻塞。如果您想要更新,請使用'read()',但確保在讀取程序線程完成時清空緩衝區。 – Sardathrion