2017-08-16 104 views
1

如何使用asyncio在兩個協程之間實現管道,一個從流中讀取,另一個寫入其中?如何在兩個Python asyncio協程之間使用讀/寫流?

假設我們有這個現有的代碼,兩個簡單的腳本。一個產生到stdout:

# produce.py 

import asyncio 
import random 
import sys 

async def produce(stdout): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     print(i, file=stdout, flush=True) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(produce(sys.stdout)) 
    loop.close() 

以及從標準輸入讀取其他:

# consume.py 

async def consume(loop, stdin): 
    reader = asyncio.StreamReader(loop=loop) 
    reader_protocol = asyncio.StreamReaderProtocol(reader) 
    await loop.connect_read_pipe(lambda: reader_protocol, stdin) 

    while True: 
     line = await reader.readline() 
     if not line: 
      break 
     print(int(line) ** 2) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(consume(loop, sys.stdin)) 
    loop.close() 

很明顯,因爲我們的兩件可以在命令行單獨運行,我們可以使用subprocess模塊殼管(produce | consume)。

但是我們希望在Python中實現等價的Unix管道,即連接這兩個現有函數的流。

像這樣的東西是行不通的:

pipe = io.BytesIO() 

await asyncio.gather(produce(pipe), 
        consume(loop, pipe)) 

如果這兩個功能將操作發電機,我們可以寫這樣的事情(蟒蛇3.6):

async def produce(): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     yield str(i) 


async def consume(generator): 
    async for value in generator: 
     print(int(value) ** 2) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(consume(produce())) 
    loop.close() 

有一些地方asyncio API會允許這樣嗎?

謝謝!

回答

2

解決這個的辦法,就是把你的當前功能爲發電機,並寫一些包裝與UNIX管道揭露他們:

# wrapper.py 

import asyncio 
import random 
import sys 


async def produce(): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     yield str(i) 


async def consume(generator): 
    async for value in generator: 
     print(int(value) ** 2) 


async def system_out_generator(loop, stdout, generator): 
    async for line in generator: 
     print(line, file=stdout, flush=True) 


async def system_in_generator(loop, stdin): 
    reader = asyncio.StreamReader(loop=loop) 
    reader_protocol = asyncio.StreamReaderProtocol(reader) 
    await loop.connect_read_pipe(lambda: reader_protocol, stdin) 
    while True: 
     line = await reader.readline() 
     if not line: 
      break 
     yield line 


async def main(loop): 
    try: 
     if sys.argv[1] == "produce": 
      await system_out_generator(loop, sys.stdout, produce()) 
     elif sys.argv[1] == "consume": 
      await consume(system_in_generator(loop, sys.stdin)) 
    except IndexError: 
     await consume(produce()) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main(loop)) 

您可以使用:

python wrapper.py # Python generators 

或:

python wrapper.py produce | python wrapper.py consume # System pipes 
+0

這假定流數據是換行符分隔的文本。 –

+0

在我們的情況下,這是總是得到一個完整的路線,但我想你可以使用'read(1024)',而不是如果你想要一個二進制流。 – Natim

相關問題