2015-10-13 66 views
10

說我有兩個芹菜任務:如何芹菜的Task.map處理錯誤

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    if a % 5: 
     return a 
    raise RuntimeError(a) 

所以,當你去跑run_flakey_things因爲序列中的第一項引發一個異常,它會翻倒直線距離。我想按照地圖的順序爲的順序爲所有項目運行任務,但繼續在異常上運行,並在所有這些完成後引發新的異常。

理想的做法是,如果我能在應用之前的ON_FAILURE添加到xmap對象,但xmap沒有出現是一個完整的任務目標。

回答

0

您可以更改您的返回值以指示並傳播錯誤。像這樣:

import traceback 

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    d = {'value': None, 'error': None} 
    try: 
     if a % 5: 
      d['value'] = a 
    except: 
     d['error'] = traceback.format_exc() 
    return d 

然後,您可以做幾件事情:

1)改變你的run_flakey_things有和沒有錯誤組的事情,沒有錯誤返回的人,並報告有錯誤的人。

2)處理任何調用的行爲run_flakey_things

0

我沒有用過芹菜。但是,您可以添加一個__call__方法到子任務嗎?喜歡的東西:

class MyTask: 
    def __call__(self, *args, **kwargs): 
     try: 
      super(self, MyTask).__call__(*args, **kwargs) 
     except Exception, exc: 
      # store exc to be raised later in the main task 

@celery.task(base=MyTask): 
def run_flakey_and_synchronous_thing(a): 
    # ... 

然後主run_flakey_things任務,或許覆蓋apply_async()檢索存儲的例外,因爲在How to override __call__ in celery on main?