2017-07-16 58 views
1

我有一個芹菜任務返回一個列表。在此之後,我希望將列表中的每個元素髮送到2任務鏈。據我所見,這就像是chord的反面。即而不是將一個任務作爲group的回調,我想要一個group任務作爲單個任務的回調。芹菜組作爲回調

喜歡的東西: group(chain(validate.s(i) | run.s(i))() for i in results_from_first_task)

是有辦法的第一個任務完成後自動執行這一羣體?

作爲一個簡單的例子,假設一個簡單的任務返回的文件列表:

@app.task() 
def list_files(pattern): 
    return glob.glob(pattern) 

而另一對夫婦,其在單個文件執行操作任務:

@app.task() 
def validate(path): 
    return my_validation_function(path) 

@app.task() 
def run(path): 
    return my_run_function(path) 

我想從list_files的結果中爲每個條目執行validaterun

回答

0

您可以使用芹菜信號排隊您的任務。

from celery.signals import task_success 


@task_success.connect() 
def task_success_handler(sender=None, headers=None, body=None, **kwargs): 
    result = kwargs['result'] 
    for file in result: 
     validate.apply_async(file) 
     run.apply_async(file) 

或者,你可以創建一箇中間任務,並用它來排隊等任務

@app.task() 
def process(result): 
    for file in result: 
     validate.apply_async(file) 
     run.apply_async(file) 

現在你可以在你的小組使用此任務。