2011-10-05 76 views
2

我需要在特定的芹菜實例上運行一些任務。所以我配置的隊列:芹菜和路由

celeryconfig.py:

CELERY_QUEUES = { 
    'celery': { 
     'exchange': 'celery', 
     'binding_key': 'celery', 
    }, 
    'import': { 
     'exchange': 'import', 
     'binding_key': 'import.products', 
    }, 
} 

CELERY_ROUTES = { 
    'celery_tasks.import_tasks.test': { 
     'queue': 'import', 
     'routing_key': 'import.products', 
    }, 
} 

import_tasks.py:

@task 
def test(): 
    print 'test' 

@task(exchange='import', routing_key='import.products') 
def test2 
    print 'test2' 

然後我開始celeryd:

celeryd -c 2 -l INFO -Q import 

並嘗試執行任務。 'test'執行但'test2'不執行。但我不想在CELERY_ROUTES中指定每個導入任務。我如何指定哪個隊列應該在任務定義中執行任務?

+0

哦,忘了說我使用redis作爲經紀人。 –

回答

0

哦,忘了說我已經使用了send_task函數來執行任務。而這個功能不會導入任務。它只是將任務的名稱發送到隊列中。

因此,不是這樣的:

from celery.execute import send_task 

result = send_task(args.task, task_args, task_kwargs) 

我寫道:

from celery import current_app as celery_app, registry as celery_registry 

celery_imports = celery_app.conf.get('CELERY_IMPORTS') 
if celery_imports: 
    for module in celery_imports: 
     __import__(module) 

task = celery_registry.tasks.get(args.task) 
if task: 
    result = task.apply_async(task_args, task_kwargs) 
0

我找到的解決方案,滿足幾乎我:

class CustomRouter(object): 
    def route_for_task(self, task, args=None, kwargs=None): 
     if task.startswith('celery_tasks.import_tasks'): 
      return {'exchange': 'import', 
        'routing_key': 'import.products'} 

CELERY_ROUTES = (
    CustomRouter(), 
) 

問題是,現在我不能使用任務名稱。