2014-10-09 40 views
0

最近,我創建了一個項目芹菜,整個項目看起來像這樣芹菜 - pool_restart不支持添加新任務以及

proj 
    |-- math 
    | |-- tasks.py (add) 
    | 

我開始一個工人用命令「芹菜-A PROJ工人-l info';

-------------- [email protected] v3.1.15 (Cipater) 
---- **** ----- 
--- * *** * -- Windows-7-6.1.7601-SP1 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   XXXX:0x40706d8 
- ** ---------- .> transport: amqp://guest:**@localhost:5679// 
- ** ---------- .> results:  mongodb://localhost:27017/ 
- *** --- * --- .> concurrency: 4 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 

,然後添加一個名爲文本新包[最新的代碼結構如下所示]

proj 
    |-- math 
    |  |-- tasks.py (add) 
    | 
    |-- text 
    |  |-- tasks.py (scan) 

在這個時候,我想下面的代碼重新啓動工人

app.control.broadcast('pool_restart', {'modules': ['proj.math.tasks', 
                'proj.text.tasks']}) 

要檢查它,我跑芹菜-A步驟檢查註冊。並得到了以下結果

-> [email protected]: OK 
    * proj.math.tasks.add 
    * proj.math.tasks.minus 
    * proj.text.tasks.scan 

結果表明:掃描已經成功添加的任務。在那之後,我試着用下面的代碼來執行新增的任務

result = app.send_task('proj.text.tasks.scan', args=('fjkdjfdfj.mp3',)) 
print result.get(timeout=1) 

不過,我有以下錯誤

Traceback (most recent call last): 
    File "C:\Python27\lib\site-packages\billiard\pool.py", line 361, in workloop 
    result = (True, prepare_result(fun(*args, **kwargs))) 
    File "C:\Python27\lib\site-packages\celery\app\trace.py", line 349, in _fast_trace_task 
    return _tasks[task].__trace__(uuid, args, kwargs, request)[0] 
    File "C:\Python27\lib\site-packages\celery\app\registry.py", line 27, in __missing__ 
    raise self.NotRegistered(key) 

在第27行是類型芹菜的對象.app.registry.TaskRegistry。我發現存在這種類型的多個對象。也就是說,池中的每個子進程仍然保留TaskRegistry的一個實例。

爲什麼結果celery -A proj檢查註冊包括'掃描',但我仍然得到** NotRegistered'異常?是否因爲池中的子進程不與主進程同步?

回答

0

如果該任務未在當前進程中註冊,則可以使用send_task()來按名稱調用該任務。但一般使用delayapply_async,而不是因爲send_task通話按名稱的任務,可能會導致some errors

你可以打電話給你的任務,

tasks.scan.apply_async(args=['fjkdjfdfj.mp3',], timeout=1) 

如果你真的想調用任務只有send_task,命名你的員工這使得它更容易調用任務。所以你的任務應該看起來像

from celery import task 
@task(name='my_named_task') 
def scan(foo): 
    #do something in background 

然後你可以使用send_task沒有任何問題調用它。

app.send_task('my_named_task', args=('fjkdjfdfj.mp3',)) 
+0

都使用apply_async和重命名的任務不能在我身邊 – Jacky 2014-10-10 01:53:00

+0

池中的每個工作流程工作保持TaskRegistry的一個實例。價值與主流程中的價值不同。這是根本原因嗎? – Jacky 2014-10-10 02:02:03