我想在Celery任務中實現線程池。Python多線程到Celery任務中。 celery_task.update_state()錯誤
我芹菜任務調用update_state()函數來發送有關任務狀態DB信息。它運作成功。 但是,當我將線程添加到任務並嘗試在每個線程中調用update_state()函數時 - Celery會返回錯誤。
這是工作例(無螺紋):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
這不是工作示例(帶螺紋):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
的錯誤是:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
是什麼原因?爲什麼我不能將update_state()調用到線程中?
非常感謝!我已經找到解決方案並將其發佈到此處。我沒有測試你的解決方案,但我認爲它有相同的方向。 – Denti
礦是文檔建議的方式。我會建議你使用我的。 –