2017-07-04 84 views
1

我正在使用Celery版本4.0.2。註冊基於類的任務

與先前版本的Celery相比,似乎基於類的任務沒有自動註冊(即,如果您配置了自動發現功能)。

但是,我甚至沒有實現手動註冊基於類的任務。

按照芹菜更改日誌:

http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1

因爲4.0.1版本,應該可以註冊手動任務:

from celery import Celery, Task 
app = Celery() 

class CustomTask(Task): 

    def run(self): 
     return 'hello' 

app.register_task(CustomTask()) 

但這似乎並沒有工作。有誰知道如何做到這一點?

我想這是正在討論(除了集成在https://github.com/celery/celery/issues/3744提到的自定義任務加載器)提供了一些建議:

Register Celery Class-based Task

https://github.com/celery/celery/issues/3615

https://github.com/celery/celery/issues/3744

回答

1

快到了!您需要在您註冊的任務上致電delay()

這會工作:

from celery import Celery, Task 

app = Celery() 


class CustomTask(Task): 
    def run(self): 
     return 'hello' 


task = CustomTask() 
app.register_task(task) 

task.delay() 
+0

但是,在其他文件上導入「任務」時不起作用。 – harukaeru

0

如果您需要shared_task裝飾:

from celery import Task, shared_task 

class CustomTask(Task): 
    def process(self): 
     return 'hello' 

@shared_task(bind=True, base=CustomTask) 
def custom(self): 
    self.process() 

process是啓動任務自定義名稱(裝飾覆蓋run法)

bind=True結合功能一個類實例

base=CustomTask設置任務的基類