2017-03-04 617 views
0

我想在Celery任務中實現線程池。Python多線程到Celery任務中。 celery_task.update_state()錯誤

我芹菜任務調用update_state()函數來發送有關任務狀態DB信息。它運作成功。 但是,當我將線程添加到任務並嘗試在每個線程中調用update_state()函數時 - Celery會返回錯誤。

這是工作例(無螺紋):

import celery 

@celery.task(bind=True) 
def get_info(self, user): 
    for i in xrange(4): 
     self.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 

這不是工作示例(帶螺紋):

import celery 
import threading 

lock = threading.Lock() 

def run_subtask(celery_task, i): 
    lock.acquire() 
    #Error raises here, when update_state calls 
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 
    lock.release() 

@celery.task(bind=True) 
def get_info(self, user): 

    for i in xrange(4): 
     worker = threading.Thread(target=run_subtask, args=(self, i)) 
     worker.start() 

的錯誤是:

[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found 

是什麼原因?爲什麼我不能將update_state()調用到線程中?

回答

0

我找到了答案!這是從芹菜出資人之一的答案:

task.request是一個線程局部,所以只執行任務的線程可以調用update_state。

這尤其是有道理的,如果你認爲線程可以與任務後處理程序存儲結果比賽。

您可以將TASK_ID傳遞給線程:

cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to}) 

但你必須讓該死的肯定線程加入和任務退出之前停止(的Thread.join())。 在你的例子中,線程只能在while循環退出之後才能被連接,並且由於你正在休眠1秒,連接可能會被延遲。

1

芹菜增加了一種上下文對象的線程,所以它知道它的任務是涉及到。爲了將線程與任務相關聯,您需要執行以下操作:

from celery.app import push_current_task 


def run_subtask(celery_task, i): 
    push_current_task(celery_task) 

    ... 

    pop_current_task() 
+0

非常感謝!我已經找到解決方案並將其發佈到此處。我沒有測試你的解決方案,但我認爲它有相同的方向。 – Denti

+0

礦是文檔建議的方式。我會建議你使用我的。 –