2011-11-22 768 views
7

通過繼承PeriodicTask,我設法在django-celery中獲得定期任務。我試圖創建一個測試任務,並設置它運行一些無用的東西。有用。停止/清除Django-Celery中的定期任務

現在我無法阻止它。我讀過文檔,我找不到如何從執行隊列中刪除任務。我嘗試過使用celeryctl並使用shell,但registry.tasks()是空的,所以我看不到如何刪除它。

我已經看到了我應該「撤銷」它的建議,但是爲此我需要一個任務ID,而且我看不到如何找到任務ID。

謝謝。

回答

21

任務是一條消息,「週期性任務」定期發送任務消息。每個發送的任務都會分配一個唯一的ID。

revoke將只取消單個任務消息。要獲取任務的ID,您必須保留髮送ID的 跟蹤,但您也可以在發送任務時指定自定義ID。

我不確定是否要取消單個任務消息,或者如果您想停止發送更多消息的定期任務,那麼我會列出兩個答案。

沒有內置的方式來保持與週期性任務, 但你可以設置ID爲每個任務週期性任務的名義發送任務的ID,這樣 的ID將指任何與週期性任務一起發送的任務(通常是最後一個任務)。 您可以指定一個自定義的ID這樣,

或者與@periodic_task裝飾:

@periodic_task(options={"task_id": "my_periodic_task"}) 
def my_periodic_task(): 
    pass 

或與CELERYBEAT_SCHEDULE設置:

CELERYBEAT_SCHEDULE = {name: {"task": task_name, 
           "options": {"task_id": name}}} 

如果你想簡單地刪除您週期性任務從代碼庫中刪除@periodic_task,或從CELERYBEAT_SCHEDULE刪除條目。 如果您使用的是Django數據庫調度程序,則必須從Django Admin界面中刪除定期任務 。

PS1:revoke不會停止已經啓動的任務。它只取消尚未開始的 任務。您可以使用 revoke(task_id, terminate=True)來終止正在運行的任務。默認情況下,如果您想發送另一個信號(例如KILL),使用 revoke(task_id, terminate=True, signal="KILL"),則會將TERM信號發送到 的過程。

PS2:撤銷是一個遠程控制命令,所以它只支持RabbitMQ 和Redis broker傳輸。 如果你希望你的任務通過存儲在數據庫中的cancelled 標誌支持取消,你應該這樣做,有任務檢查標誌啓動時:

from celery.task import Task 

class RevokeableTask(Task): 
    """Task that can be revoked. 

    Example usage: 

     @task(base=RevokeableTask) 
     def mytask(): 
      pass 
    """ 

    def __call__(self, *args, **kwargs): 
     if revoke_flag_set_in_db_for(self.request.id): 
      return 
     super(RevokeableTask, self).__call__(*args, **kwargs) 
+0

謝謝你這個非常透徹的答案。 –

3

以防萬一這可能幫助別人...我們在工作中遇到了同樣的問題,並且竭力尋找某種管理命令來刪除週期性任務,但我們做不到。所以這裏有一些指針。

您應該首先仔細檢查您正在使用的scheduler class

默認調度程序是celery.beat.PersistentScheduler,它只是跟蹤本地數據庫文件(擱置)中的上次運行時間。

在我們的案例中,我們使用的是djcelery.schedulers.DatabaseScheduler類。

Django的芹菜還附帶在Django的數據庫

存儲日程雖然文件沒有提到一個方法,以消除週期性任務調度程序:

使用django-celery的調度程序,您可以從Django Admin添加,修改和刪除定期任務。

我們希望以編程方式或通過shell中的(celery/management)命令執行清除。

由於我們無法找到一個命令行中,我們使用Django的/ Python的外殼:

$ python manage.py shell 
>>> from djcelery.models import PeriodicTask 
>>> pt = PeriodicTask.objects.get(name='the_task_name') 
>>> pt.delete() 

我希望這有助於!