2017-09-22 98 views
0

我用用芹菜的燒瓶中的應用,這裏是我的配置:芹菜多個隊列無法正常工作。所有任務都發送到默認隊列

app.config['CELERY_TASK_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 

app.config['CELERY_TASK_ROUTES'] = { 
    'app.tasks.extract_text': {'queue': 'processing', 'routing_key': 'processing'}, 
    ... 

    'app.tasks.vt_notifications': {'queue': 'default', 'routing_key': 'default'}, 
    ... 

    'app.tasks.update_files_from_search': {'queue': 'fast', 'routing_key': 'fast'}, 
    ... 
} 


app.config['CELERY_DEFAULT_QUEUE'] = 'default' 
app.config['CELERY_DEFAULT_EXCHANGE'] = 'default' 
app.config['CELERY_DEFAULT_ROUTING_KEY'] = 'default' 

我結束了與運行芹菜情況是這樣的:

celery -A app.tasks.celery worker -Q 'processing' --concurrency 1 -l debug -n processing 
celery -A app.tasks.celery worker -Q 'fast' --concurrency 1 -l debug -n fast 
celery -A app.tasks.celery worker -Q 'default' --concurrency 1 -l debug -n default 

所以,問題是所有的任務都被髮送到'默認'隊列。任何幫助,高度讚賞。謝謝!

+0

celery version? – ItayB

回答

3

如果使用芹菜> 4,我會推薦幾件事情: 首先,嘗試添加name到 你的任務(以確保您在CELERY_TASK_ROUTES使用正確的名稱,例如:

@app.task(name='extract_text']) 
    def extract_text(..): 
     pass 

二,試圖改變CELERY_TASK_ROUTES到:

CELERY_ROUTES = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

(而不是queue - 嘗試添加exchangeexchange_type

最後一件事,你不必使用它,只是爲了調試,就可以觸發時明確地航線任務:

(extract_text.signature(args=(...), queue='processing')).delay() 

編輯:

你確定你正在使用的配置如所須?這裏是一個例子:

celery_app = Celery() 
celeryconfig = {} 
celeryconfig['BROKER_URL'] = 'amqp://' 
celeryconfig['CELERY_RESULT_BACKEND'] = 'redis://localhost' 
celeryconfig['CELERY_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 
celeryconfig['CELERY_ROUTES'] = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

celery_app.config_from_object(celeryconfig) 
+2

如果我在觸發時明確地路由任務,它完美地工作。看起來像CELERY_TASK_ROUTES變量只是被忽略 –

+0

所以,你能接受答案嗎? :-) – ItayB

+1

您是否嘗試過'CELERY_ROUTES'?也許你沒有根據需要使用配置,我會在第二秒更新我的答案 – ItayB