2011-10-06 117 views
22

在某些情況下,我想在該任務中使芹菜任務失敗。我試過如下:如何使任務中的芹菜任務失敗?

from celery.task import task 
from celery import states 

@task() 
def run_simulation(): 
    if some_condition: 
     run_simulation.update_state(state=states.FAILURE) 
     return False 

但是,任務仍然報告已經成功:

任務sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65] 成功1.17847704887s :假

看來,任務運行時的狀態,纔可以修改,一旦完成 - 芹菜改變狀態,不管它是認爲的結果(參見this question)。有什麼辦法,不通過提出異常而使任務失敗,讓芹菜迴歸該任務失敗?

+0

你試圖從你的代碼中引發異常? – hymloth

+0

@hymloth提出異常使得任務失敗,其中包括每次發生這種情況都會發送一封電子郵件 - 這是我想避免的。抱歉不清楚,我現在改變了這個問題。 – Meilo

回答

2

在Ask Solem的這個問題上,我得到了一個interesting reply,他在那裏提出了一個'after_return'處理程序來解決問題。這對未來可能是一個有趣的選擇。

在我通過簡單地從任務返回一個字符串「失敗」時,我想讓它失敗,然後檢查爲如下解決的問題同時:

result = AsyncResult(task_id) 
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'): 
    # Failure processing task 
+0

如果條件可以寫爲'result.state in READY_STATES | EXCEPTION_STATES:'where where' from celery.states import READY_STATES,EXCEPTION_STATES,UNREADY_STATES' –

14

一個更好的辦法來做到這一點是更新任務狀態FAILURE然後引發Ignore例外,因爲返回任何值將記錄爲成功的例子任務:

from celery import Celery, states 
from celery.exceptions import Ignore 

app = Celery('tasks', broker='amqp://[email protected]//') 

@app.task(bind=True) 
def run_simulation(self): 
    if some_condition: 
     # manually update the task state 
     self.update_state(
      state = states.FAILURE, 
      meta = 'REASON FOR FAILURE' 
     ) 

     # ignore the task so no other state is recorded 
     raise Ignore() 
+0

請注意,這將不會觸發task_failure信號,因此您要附加到這些處理程序的任何處理程序都不會被觸發,而這種處理程序並不好。 –