我需要每秒調用一次任務(例如)來輪詢某個硬件上的某些傳感器數據。在單元測試中,我想要做的就是檢查是否調用了正確的方法,並且錯誤(例如傳感器已被炸燬或消失)確實被捕獲。在asyncio中測試永久運行的任務
這裏是一個玩具例如模仿真正的代碼:
import pytest
import asyncio
import mock
async def ook(func):
while True:
await asyncio.sleep(1)
func()
@pytest.mark.asyncio
async def test_ook():
func = mock.Mock()
await ook(func)
assert func.called is True
正如預期的那樣,運行此將永遠阻塞。
如何取消ook
任務,以便單元測試不會阻止?
解決方法是將循環拆分爲另一個函數並將其定義爲不可測試。我想避免這樣做。請注意,與func
搞混(要打電話loop.close()
或其他一些)不起作用,因爲它只是在那裏玩具示例測試可以斷言的東西。