2017-08-24 172 views
1

我正在開發一個REST API,它應該接收JSON並啓動芹菜任務。這些芹菜任務使用Spark。我正在使用Flask框架將API和redis作爲代理消息運行。我發送一個請求時沒有錯誤,但是如果我發送一個請求而其他人沒有結束,則很可能會出現錯誤。我相信這是因爲芹菜任務是同時啓動的,Spark不會允許我多次同時使用sqlContext。下面是一個簡單的例子:使用pyspark的Celery任務的Flask API不能同時運行多個任務

1日腳本:

import os 
import sys 
import redis 
import configparser 
from celery import Celery 
from celery.result import AsyncResult 
from flask_cors import CORS, cross_origin 
from flask import Flask, request, jsonify, url_for 

# import the script where the script is stored 
from celery_task import one_task 
# configurate and iniate the Flask API 
flask_app = Flask(__name__) 
# allow users from other networks to use the API 
CORS(flask_app) 

flask_app.debug = True 
flask_app.config['celery_broker_url'] = 'redis://localhost:6379/0' 
flask_app.config['celery_result_backend'] = 'redis://localhost:6379/0' 
celery = Celery("my_api",broker='redis://localhost:6379/0') 
# launch celery task app 

celery.conf.update(flask_app.config) 

@flask_app.route('/test', methods=['POST']) 
def job_route(): 
    # the exception raised means the key are not correct 
    task = one_task.apply_async() 
    response_msg = {"payload":"ok"} 
    return jsonify(response_msg) 


if __name__ == '__main__': 
    # launch the app on port from conf file on local public IP adress 
    flask_app.run(host="localhost", port=8000) 

2D腳本:

import os 
import sys 
import celery 
import configparser 
import time 
from celery import Celery 
from pyspark import SparkContext 
from pyspark.sql import SparkSession, SQLContext 
from flask import request, url_for, jsonify 
celery = Celery("my_api",broker='redis://localhost:6379/0') 

sc = SparkContext() 
sc.setLogLevel("WARN") 
sqlContext = SQLContext(sc) 

# dumb celery task 
@celery.task(bind=True) 
def one_task(self): 
task_id = self.request.id 
for i in range(10): 
    l = [('Alice', 1)] 
    data = sqlContext.createDataFrame(l, ['name', 'age']) 
    data = data.withColumnRenamed("nickname","name") 
    data.show() 
    # call a useless operation in order to show the error 

我永遠不會有同樣的錯誤,所以兩次我不會發布它。我認爲一個解決辦法是強迫芹菜一次不能發起多個任務。我看到在這個鏈接中可以使用django(http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html),我想嘗試使用Flask的類似解決方案。

爲了正確運行代碼必須使用3個端子:在第一終端:

redis-server 

2D終端:

celery -A api_debug worker --loglevel=info 

3D終端:

python api_debug.py 

任何幫助,將不勝感激。

回答

1

我找到了一個解決方案,我不知道它是否是最優的或沒有,但在終端推出芹菜我加了命令:

celery -A api_debug worker --loglevel=info --concurency=1 

所以只有一個隊列被激活,兩個任務不能同時推出。