2012-03-20 34 views
5

我對我的配置應該看起來像設置主題交換有點困惑。主題與芹菜和兔子交換中國

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

這是想什麼我來完成:

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

則:

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

我只想任務1從QueueOne和Task2消耗從QueueTwo消耗。

現在的問題是,當Task1和2運行時,它們也會排空QueueFirehose,並且TaskFirehose任務不會執行。

我的配置有問題嗎?還是我誤解了一些東西?

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

也許這只是術語來澄清,但你的描述聽起來像你混淆任務和工人。例如,你說「Task2發送到隊列2」,然後再說「Task2從隊列2消耗」。任務不消耗;他們被消耗(由工人)。你也可以說「TaskFirehose任務永遠不會執行」,但在你的描述中,沒有TaskFirehose被髮送到任何隊列。基本概念是:任務被髮送到隊列;並且工作人員從他們分配的隊列執行任務。任務!=執行它們的工作人員。 – 2013-08-25 17:26:13

回答

0

假設你真正的意思是這樣的:

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

則:

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

這可能不是正是你的意思,但我認爲它應該涵蓋許多場景和希望你也是。 像這樣的東西應該工作:

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

更多的選擇和參考:http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

這是直接從文檔。

我也有類似的情況,我用稍微不同的方式解決它。我不能用芹菜多與supervisord。 所以相反,我爲每個工人在supervisord中創建了多個程序。無論如何,工作人員將採用不同的流程,只需讓supervisord爲您處理所有事情。 配置文件看起來像: -

; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

同樣,對於Worker2和WorkerFirehose,編輯相應的線路進行:

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

包括他們所有的supervisord .conf文件,並應該這樣做。