2017-06-21 98 views
0

我正在使用Python3,Flask 0.12和Pytest 3.0.7。Pytest在燒瓶服務器運行另一個線程時掛起

我有一個類似的燒瓶應用:

class AppInitializer: 
    def __init__(self): 
     pass 

    @staticmethod 
    def __function_to_be_refreshed(): 
     while True: 
      try: 
       time.sleep(5) 
      except Exception as e: 
       logger.exception(e) 


    def create_app(self): 
     daemon_thread = threading.Thread(target=self.__function_to_be_refreshed) 
     daemon_thread.daemon = True 
     daemon_thread.start() 
     atexit.register(daemon_thread.join) 
     app_ = Flask(__name__) 
     return app_ 


app_initializer = AppInitializer() 
app = app_initializer.create_app() 

我想如下測試使用pytest這個程序:

import unittest 

import pytest 


class TestCheckPriceRequestAPI(unittest.TestCase): 
    def setUp(self): 
     self.app = api.app.test_client() 

    def test_random(self): 
     pass 

當我運行使用pytest,該測試(此測試以及所有其他測試)成功運行,但pytest掛起。如何停止正在運行的pytest進程(或者殺死守護進程線程)?

+0

好燒瓶中的應用程序有沒有業務產卵線程 – e4c5

+0

@ e4c5:這是在項目中添加的需求。 – tusharmakkar08

回答

2

join命令只意味着線程將等待直到線程結束,但不會完成它。結束與無限循環的線程,你可以這樣做:

class AppInitializer: 
    def __init__(self): 
     self._run = True 
     self._daemon_thread = None 

    def __function_to_be_refreshed(self): 
     while self._run: 
      try: 
       time.sleep(5) 
      except Exception as e: 
       logger.exception(e) 

    def __shutdown(self): 
     self._run = False 
     if self._daemon_thread: 
      self._daemon_thread.join() 

    def create_app(self): 
     self._daemon_thread = threading.Thread(target=self.__function_to_be_refreshed) 
     ... 
     atexit.register(self.__shutdown) 
     ... 
+0

我在完成一個線程和終止之間感到困惑。感謝您的建議:) – tusharmakkar08

相關問題