2012-04-17 116 views
37

如果我有一個函數定義如下:如何動態地添加/刪除定期任務,芹菜(celerybeat)

def add(x,y): 
    return x+y 

有沒有辦法動態地添加該功能作爲芹菜PeriodicTask並開始它在運行?我希望能夠做到像(僞):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) 
celery.beat.start(some_unique_task_id) 

我也想停止或動態地刪除該任務有類似的信息(僞):

celery.beat.remove_task(some_unique_task_id) 

celery.beat.stop(some_unique_task_id) 

僅供參考我不使用djcelery,它允許您通過django管理員管理定期任務。

回答

18

不,我很抱歉,這對於常規的celerybeat是不可能的。

但它很容易擴展,可以做你想做的事情,例如, django-celery 調度器只是讀取和寫入數據庫 (頂部有一些優化)的時間表的子類。

此外,即使對於非Django項目,您也可以使用django-celery調度程序。

事情是這樣的:

  • 安裝Django Django的+ - 芹菜:

    $ PIP安裝-U的Django Django的芹菜

  • 添加以下設置到您的celeryconfig:

    DATABASES = { 
        'default': { 
         'NAME': 'celerybeat.db', 
         'ENGINE': 'django.db.backends.sqlite3', 
        }, 
    } 
    INSTALLED_APPS = ('djcelery',) 
    
  • 創建數據庫表:

    與數據庫調度
    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
    
  • 開始celerybeat:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ 
        -S djcelery.schedulers.DatabaseScheduler 
    

而且還有可以用於非的Django的djcelerymon命令項目 開始celerycam和Django管理Web服務器中同樣的過程,你可以使用 也可以在一個漂亮的網頁界面中編輯你的週期性任務:

$ djcelerymon 

(注意:由於某種原因djcelerymon不能使用Ctrl + C停止,你 必須用Ctrl + Z +殺%1)

+1

你能否提一下代碼來添加任務並刪除?對不起,我沒有得到。 – 2013-10-28 16:26:02

+6

從2012年到2016年這個變化嗎? – Tanay 2016-06-02 09:44:28

32

這個問題被回答了google groups

我不是作者,都歸功於讓馬克

下面是這個妥善的解決辦法。確定工作,在我的情況下, 我分類了週期性任務,並創建了一個模型,因爲我可以將 添加到模型中,因爲我需要其他字段,所以我可以添加 「終止」方法。您必須將定期任務的啓用 屬性設置爲False並在刪除之前將其保存。整個 子類是不是必須的,schedule_every方法是 真的做的工作。當你準備好終止你的任務時(如果你的 沒有繼承它),你可以使用 PeriodicTask.objects.filter(name = ...)來搜索你的任務,禁用 它,然後刪除它。

希望這會有所幫助!

from djcelery.models import PeriodicTask, IntervalSchedule 
from datetime import datetime 

class TaskScheduler(models.Model): 

    periodic_task = models.ForeignKey(PeriodicTask) 

    @staticmethod 
    def schedule_every(task_name, period, every, args=None, kwargs=None): 
    """ schedules a task by name every "every" "period". So an example call would be: 
     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """ 
     permissible_periods = ['days', 'hours', 'minutes', 'seconds'] 
     if period not in permissible_periods: 
      raise Exception('Invalid period specified') 
     # create the periodic task and the interval 
     ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task 
     interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) 
     if interval_schedules: # just check if interval schedules exist like that already and reuse em 
      interval_schedule = interval_schedules[0] 
     else: # create a brand new interval schedule 
      interval_schedule = IntervalSchedule() 
      interval_schedule.every = every # should check to make sure this is a positive int 
      interval_schedule.period = period 
      interval_schedule.save() 
     ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) 
     if args: 
      ptask.args = args 
     if kwargs: 
      ptask.kwargs = kwargs 
     ptask.save() 
     return TaskScheduler.objects.create(periodic_task=ptask) 

    def stop(self): 
     """pauses the task""" 
     ptask = self.periodic_task 
     ptask.enabled = False 
     ptask.save() 

    def start(self): 
     """starts the task""" 
     ptask = self.periodic_task 
     ptask.enabled = True 
     ptask.save() 

    def terminate(self): 
     self.stop() 
     ptask = self.periodic_task 
     self.delete() 
     ptask.delete() 
+1

這應該是公認的答案。 – kai 2015-06-07 16:59:53

+1

@kai'IntervalSchedule','PeriodicTask'等是'djcelery'類,OP說他沒有使用'djcelery'。儘管如此,這絕對有用。 – Chris 2016-03-28 14:14:07

2

你可以看看這個flask-djcelery其配置瓶中,djcelery並且還提供了瀏覽的REST API

2

有一個叫Django的芹菜拍它提供了一個模型庫的需求。爲了使其動態加載新的週期性任務,必須創建自己的調度程序。

from django_celery_beat.schedulers import DatabaseScheduler 


class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      print('resetting heap') 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = new_schedule.keys() - self.schedule.keys() 
       to_remove = self.schedule.keys() - new_schedule.keys() 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 

     return self._schedule 
+0

謝謝。如果key不在self.schedule.keys()中''to_add = [key在new_schedule.keys()中的鍵,'to_remove'和'to_remove'類似的做了這個訣竅,但沒有馬上工作。爲什麼這不是一個標準選項?直到現在,我必須讓Celery任務調用其他Celery任務並倒計時。這聽起來不太好。 – freethebees 2017-07-04 16:30:27