2012-04-06 56 views
3

在Celery中,我正在運行一項主要任務,爲查詢中的每個項目運行一個子任務。子任務應該並行運行。在UI上,我有一個進度條,顯示總共完成了多少個子任務。我正在更新主任務狀態以將信息提供給進度條。我的問題是,在將所有子任務推送給經紀人後,主任務立即結束,因此我無法再更新他的狀態。我希望主要任務可以等到所有子任務完成。可能嗎?其他解決方案?這是我的僞代碼(真實代碼不使用全局;-))。在芹菜如何更新主任務的狀態,直到他所有的子任務完成?

total = 0 
done = 0 

@task(ignore_result=True) 
def copy_media(path): 
    global total, done 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    for document in documents: 
     process_doc.delay(document, path, copy_media) 

@task(ignore_result=True) 
def process_doc(document, path, copy_media): 
    global total, done 
    # Do some stuff 
    done += 1 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 

回答

0

我找到了一種方法,使用TaskSet。但我並不完全滿意,因爲我不能忽視子任務的結果。如果我不理會導致對process_doc任務results.ready()總是返回Falseresults.completed_count()總是返回0,等下面的代碼:

@task(ignore_result=True) 
def copy_media(path): 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    job = TaskSet(tasks=[process_doc.subtask((document, path)) 
         for document in documents]) 
    results = job.apply_async() 
    doc_name = '' 
    while not results.ready(): 
     done = results.completed_count() 
     if done: 
      last = done - 1 
      for idx in xrange(last, -1, -1): 
       if results[idx].ready(): 
        doc_name = results[idx].result 
        break 
     copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name}) 
     time.sleep(0.25) 

@task() 
def process_doc(document, path): 
    # Do some stuff 
    return document 
+1

正如文檔中明確指出的那樣:「讓一個任務等待另一個任務的結果真的是效率低下,甚至在工作池耗盡時甚至會導致死鎖 例如通過使用回調使設計異步。 「 http://celery.readthedocs.org/en/latest/userguide/tasks.html#task-synchronous-subtasks – antoinet 2015-06-25 15:09:24

+0

我的主要任務'copy_media'沒有等待另一個任務的結果。它不斷更新狀態以顯示完成了多少子任務等。子任務並行運行,因此回調不是一個選項。最重要的是,我不能有死鎖,因爲'copy_media'一次只能運行一個,所以它只是阻止1個工作者。 – Etienne 2015-06-25 18:03:44

+0

這是有效地等待其他任務的結果。你正在調用results.ready(),並且有一個任務正在測試其他人。如果你的工作人員疲憊不堪,你就會陷入僵局,因爲沒有任何子任務會被執行,你的主要任務將永遠不會結束。 – rsalmei 2016-10-03 17:31:41

0

可以使用memcached的支持緩存來的完成任務門店數量。在django cache API中甚至有cache.inrc用於原子增量,以確保count的併發更新不會造成干擾。

此外,主要任務運行,直到所有子任務完成是一個壞主意,因爲你基本上阻塞了芹菜工作很長一段時間。如果芹菜運行一個工作進程,這將導致永無止境的鎖定。

+0

關於您使用Django緩存存儲計數的建議,我發現奇怪的是必須重新實現Celery中已有的內容,即。一個國家體系。而且我的需求比較複雜,只保留點數。正如你在我的回答中看到的那樣,我還傳遞了文檔名稱(以及更多內容)。由於我的主要任務是阻止一名芹菜工人,所以我可以看到這個問題,但對我來說絕對不是問題。我爲這個主要任務提供了一個專用的Celery守護進程,並且與許多工作人員一起執行了子任務,並且阻止了主要任務同時運行。 – Etienne 2012-04-07 15:54:07

0

我不知道你正在運行哪個版本的芹菜,但你可以看看Group子任務(3.0版本的新功能)。

相關問題