我有一個有2個端點的API,一個是接收JSON的簡單文章,另一個是根據jsons列表長度多次調用第一個端點並將返回保存到列表的端點。如何在python中多線程調用另一個端點方法?
第一種方法
@app.route('/getAudience', methods=['POST', 'OPTIONS'])
def get_audience(audience_=None):
try:
if audience_:
audience = audience_
else:
audience = request.get_json()
except (BadRequest, ValueError):
return make_response(jsonify(exception_response), 500)
return get_audience_response(audience, exception_response)
方法二
@app.route('/getMultipleAudience', methods=['POST', 'OPTIONS'])
def get_multiple_audience():
try:
audiences = request.json
except (BadRequest, ValueError):
return make_response(jsonify(exception_response), 500)
response = []
for audience in audiences:
new_resp = json.loads(get_audience(audience).data)
response.append(new_resp)
return make_response(jsonify(response))
我想叫第一種方法在第二種方法的啓動列表中每個對象的線程,所以我嘗試這樣做:
def get_multiple_audience():
with app.app_context():
try:
audiences = request.get_json()
except (BadRequest, ValueError):
return make_response(jsonify(exception_response), 500)
for audience in audiences:
thread = Thread(target=get_audience, args=audience)
thread.start()
thread.join()
return make_response(jsonify(response))
並得到這個錯誤:
Exception in thread Thread-6:
Traceback (most recent call last):
File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\Python27\lib\threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\lib\site-packages\flask_cors\decorator.py", line 123, in wrapped_function
options = get_cors_options(current_app, _options)
File "C:\Python27\lib\site-packages\flask_cors\core.py", line 286, in get_cors_options
options.update(get_app_kwarg_dict(appInstance))
File "C:\Python27\lib\site-packages\flask_cors\core.py", line 299, in get_app_kwarg_dict
app_config = getattr(app, 'config', {})
File "C:\Python27\lib\site-packages\werkzeug\local.py", line 347, in __getattr__
return getattr(self._get_current_object(), name)
File "C:\Python27\lib\site-packages\werkzeug\local.py", line 306, in _get_current_object
return self.__local()
File "C:\Python27\lib\site-packages\flask\globals.py", line 51, in _find_app
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
於是我試圖修改這樣的第一種方法:
@app.route('/getAudience', methods=['POST', 'OPTIONS'])
def get_audience(audience_=None):
with app.app_context():
try:
...
,並得到了同樣的錯誤。任何人都可以給我一個提示,建議,最佳實踐或解決方案嗎?
如果要測試您的Web服務,最好的方法是使用請求創建單獨/獨立的應用程序。 –
@LaurentLAPORTE這不是他們所問的 – davidism