2010-05-17 49 views
2

我正在寫一個小型的Django應用程序,我應該能夠爲每個模型對象創建 其週期性任務,它將以一定的時間間隔與 執行。我使用了這個應用程序芹菜,但我不明白一兩件事:自己的參數在芹菜的PeriodicTask run()方法

class ProcessQueryTask(PeriodicTask): 
    run_every = timedelta(minutes=1) 

    def run(self, query_task_pk, **kwargs): 
     logging.info('Process celery task for QueryTask %d' % 
query_task_pk) 
     task = QueryTask.objects.get(pk=query_task_pk) 
     task.exec_task() 
     return True 

那麼我做如下:

>>> from tasks.tasks import ProcessQueryTask 
>>> result1 = ProcessQueryTask.delay(query_task_pk=1) 
>>> result2 = ProcessQueryTask.delay(query_task_pk=2) 

首先調用是成功的,但其他定期調用返回錯誤 - TypeError:run()在 celeryd服務器中使用恰好2個非關鍵字參數(給出1)。 我可以將自己的參數傳遞給PeriodicTask run()嗎?

回答

5

這被Ask Solem在his response to your questioncelery-users Google group奇妙地回答了。

定期任務不使用參數,因此您需要創建幾個 類或定期處理一個以上的「模型」任務。

例如爲:

from celery.task import PeriodicTask 
from celery.decorators import periodic_task 

# base class 
class BaseProcessQueryTask(PeriodicTask): 
    abstract = True 
    run_every = timedelta(minutes=1) 
    query_task_pk = None 

    def run(self): 
     task = QueryTask.objects.get(pk=self.query_task_pk) 
     task.exec_task() 

class ProcessQueryTask1(BaseProcessQueryTask): 
    query_task_pk = 1 

class ProcessQueryTask2(BaseProcessQueryTask): 
    query_task_pk = 2 

但它更可能是你想是這樣的:

@task(ignore_result=True) 
def execute_query_task(task): 
    task.exec_task() 

@periodic_task(run_every=timedelta(minutes=1)) 
def process_query_tasks(): 
    for task in QueryTask.objects.all(): 
     ExecuteQueryTask.delay(task)