2010-01-25 86 views
16

我有一個Django項目,我正在嘗試使用Celery提交後臺處理任務(http://ask.github.com/celery/introduction.html)。 Celery與Django完美集成,我已經能夠提交自定義任務並獲得結果。在運行我的任務之前,如何設置Celery調用自定義初始化函數?

唯一的問題是我無法找到在守護進程中執行自定義初始化的理智方式。我需要調用一個昂貴的函數,在我開始處理任務之前加載大量內存,並且我無法每次調用該函數。

有沒有人有過這個問題?任何想法如何解決它而無需修改芹菜源代碼?

謝謝

+0

你需要運行什麼樣的自定義初始化? – diegueus9 2010-01-25 03:37:41

+0

我需要加載處理每個任務所需的〜10MB數據結構(所有任務的結構都是相同的)。 – xelk 2010-01-25 04:12:38

回答

15

您可以編寫自定義加載程序或使用信號。

裝載機具有on_task_init方法,在任務即將執行時調用, 和on_worker_init由celery + celerybeat主進程調用。

使用的信號可能是最簡單的,可用的信號是:

0.8.4:

  • task_prerun(task_id, task, args, kwargs)

    當任務即將由工人來執行調度(或本地 如果使用apply /或者如果CELERY_ALWAYS_EAGER已設置)。

  • task_postrun(task_id, task, args, kwargs, retval) 在與上面相同的條件下執行任務後分派。

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    在0.9.x版本(在github當前主分支)當施加一個任務被調用(不適合長時間運行的操作)可用

附加信號:

  • worker_init()

    當芹菜已啓動時調用(在任務初始化之前,因此如果在 系統支持fork,則任何內存更改將被複制到子工作進程 )。

  • worker_ready()

    時調用celeryd能夠接收任務。

  • worker_shutdown()

    時celeryd正在關閉調用。

下面是一個例子預先計算的東西第一次任務的過程中運行:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

如果你想對所有任務運行功能,只跳過sender說法。