2015-12-02 100 views
0
@asyncio.coroutine 
    def listener(): 
     while True: 
      message = yield from websocket.recieve_message() 
      if message: 
       yield from handle(message) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(listener()) 

假設我使用帶有asyncio的websockets。這意味着我收到來自websockets的消息。當我收到一條消息時,我想處理這條消息,但是我用我的代碼丟失了所有的異步事物。因爲yield from handle(message)正在定義阻塞...我怎麼能找到一種方法使其非阻塞?就像,在同一時間處理多個消息。在我可以處理另一條消息之前,不必等待消息被處理。Asyncio和無限循環

謝謝。

+2

通常你需要每個websocket讀取任務,寫入websocket可能會與其他任務異步執行。 「手柄」也可能是單獨的任務。 你的代碼不完整,所以你很難得到你需要的東西。 –

+0

如果這是你調用的唯一協程,那麼監聽器會阻塞,因爲它會無限運行(因爲while循環)。如果你有另一個同時運行的協同程序(從語句中產生它自己的收益),那麼asyncio將在語句收益之間來回反彈,因此它將不再是「阻塞」的。 – shongololo

回答

2

如果你不關心從手柄的消息的返回值,你可以簡單地爲它創建一個新的任務,將在事件循環旁邊的WebSocket的閱讀器上運行。下面是一個簡單的例子:

@asyncio.coroutine 
def listener(): 
    while True: 
     message = yield from websocket.recieve_message() 
     if message: 
      asyncio.ensure_future(handle(message)) 

ensure_future將創建一個任務,並將其連接到默認的事件循環。由於循環已經在運行,它將與您的websocket閱讀器並行處理。事實上,如果它是一個緩慢運行的I/O阻塞任務(如發送電子郵件),您可以輕鬆地同時運行幾十個句柄(消息)任務。它們是在需要時動態創建的,並在完成時銷燬(比產生線程的開銷低得多)。

如果您想要更多的控制權,您可以簡單地在讀取器中寫入一個asyncio.Queue,並擁有一個固定大小的任務池,它可以消耗隊列,這是多線程或多進程中的典型模式節目。

@asyncio.coroutine 
def consumer(queue): 
    while True: 
     message = yield from queue.get() 
     yield from handle(message) 

@asyncio.coroutine 
def listener(queue): 
    for i in range(5): 
     asyncio.ensure_future(consumer(queue)) 
    while True: 
     message = yield from websocket.recieve_message() 
     if message: 
      yield from q.put(message) 

q = asyncio.Queue() 
loop = asyncio.get_event_loop() 
loop.run_until_complete(listener(q))