python
  • rabbitmq
  • celery
  • 2012-08-09 74 views 8 likes 
    8

    我想調用一個任務,併爲該任務創建一個隊列,如果它不存在,那麼立即插入到該隊列的被調用任務。我有以下代碼:芹菜動態隊列創建和路由

    @task 
    def greet(name): 
        return "Hello %s!" % name 
    
    
    def run(): 
        result = greet.delay(args=['marc'], queue='greet.1', 
         routing_key='greet.1') 
        print result.ready() 
    

    然後我有一個自定義路由器:

    class MyRouter(object): 
    
        def route_for_task(self, task, args=None, kwargs=None): 
         if task == 'tasks.greet': 
          return {'queue': kwargs['queue'], 
            'exchange': 'greet', 
            'exchange_type': 'direct', 
            'routing_key': kwargs['routing_key']} 
         return None 
    

    這將創建一個名爲交流和greet.1排隊叫greet.1但隊列爲空。交換機應該叫做greet,該交換機知道如何將路由密鑰(如greet.1)路由到名爲greet.1的隊列。

    任何想法?

    回答

    13

    當你做到以下幾點:

    task.apply_async(queue='foo', routing_key='foobar') 
    

    然後芹菜會採取默認值從CELERY_QUEUES, 「富」隊列或如果不存在,那麼使用(隊列= FOO,Exchange會自動創建= FOO,routing_key = FOO)

    所以,如果 '富' 不存在CELERY_QUEUES你最終會用:

    queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo') 
    

    生產者將隨後宣佈,隊列,但因爲你重寫routing_key, 實際發送使用routing_key = 'foobar'

    這似乎很奇怪的消息,但該行爲是話題的交流, 在那裏你發佈到不同主題的真正有用的。

    雖然很難做到你想要的,但你可以自己創建隊列 並聲明它,但這不適用於自動發佈消息的重試。 如果apply_async的隊列參數可以支持 自定義kombu.Queue而不是將聲明並用作目標,那將會更好。 也許你可以在http://github.com/celery/celery/issues

    +0

    我不再擔心手動創建隊列打開該問題,而不是隻產生一個新的工作自動創建隊列和交換這使得更多的意義我的問題。一如既往,感謝您的回覆。 :) – Marconi 2012-08-18 06:38:17

    相關問題