有沒有一種方法可以通過編程方式確定當前導入/運行的模塊是否在芹菜工作環境中完成?如何檢測我是否在芹菜工人身上運行?
我們已經決定在運行Celery worker之前設置一個環境變量,並在代碼中檢查這個環境變量,但是我想知道是否有更好的方法?
有沒有一種方法可以通過編程方式確定當前導入/運行的模塊是否在芹菜工作環境中完成?如何檢測我是否在芹菜工人身上運行?
我們已經決定在運行Celery worker之前設置一個環境變量,並在代碼中檢查這個環境變量,但是我想知道是否有更好的方法?
添加環境變量是檢查芹菜工作人員是否運行模塊的好方法。在任務提交者進程中,我們可以設置環境變量,以標記它不在芹菜工作者的上下文中運行。
但更好的方法可能是使用一些芹菜信號,這可能有助於知道模塊是否在工作人員或任務提交者中運行。例如,worker-process-init信號被髮送到每個子任務執行程序進程(以預先執行模式),並且該處理程序可用於設置某個全局變量,指示它是一個工作進程。
根據您的用例場景到底是什麼,你可以通過檢查請求ID是否設置爲檢測它:
@app.task(bind=True)
def foo(self):
print self.request.id
如果你調用上面foo.delay()
那麼任務將是發送給工作人員並且self.request.id
將被設置爲唯一編號。如果您調用它作爲foo()
,那麼它將在您當前的進程中執行,並且self.request.id
將爲None
。
啓動帶有名稱的工作人員是一種很好的做法,以便管理(停止/終止/重新啓動)它們變得更加容易。您可以使用-n
來命名工人。現在
celery worker -l info -A test -n foo
,在腳本中可以使用app.control.inspect
,看看是否該名工人正在運行。
In [22]: import test
In [23]: i = test.app.control.inspect(['foo'])
In [24]: i.app.control.ping()
Out[24]: [{'[email protected]': {'ok': 'pong'}}]
您可以從Celery
應用實例類使用current_worker_task
屬性。 Docs here.
隨着定義了以下任務:
# whatever_app/tasks.py
celery_app = Celery(app)
@celery_app.task
def test_task():
if celery_app.current_worker_task:
return 'running in a celery worker'
return 'just running'
您可以運行在一個Python殼以下:
In [1]: from whatever_app.tasks import test_task
In [2]: test_task()
Out[2]: 'just running'
In [3]: r = test_task.delay()
In [4]: r.result
Out[4]: u'running in a celery worker'
注:顯然,對於test_task.delay()
成功,你需要有在至少有一名芹菜工人正在運行並配置爲從whatever_app.tasks
加載任務。
'celery.current_app'也許? – BlackBear