2016-01-20 61 views
0

我有一個RabbitMQ設置,具有以下配置。多交換隊列模型的RabbitMQ客戶端設計

  • 每個Exchange是FANOUT
  • 附連到每個交換多個隊列。
  • BlockingConnection由消費者製造。
  • 單個消費者處理所有回調。

問題 -

一些有效載荷需要更長的時間比其他人來處理,這導致消費者賦閒,即使有其他隊列的有效載荷。

問題 -

  1. 我應該如何實現消費者,以避免長時間等待? 應該爲每個模塊運行單獨的消費者嗎?任何用戶體驗?
  2. 我可以配置RabbitMQ來處理這些情況嗎?如果是這樣的話?

回答

0

首先很高興知道爲什麼你有多個扇出交換?你真的需要這個嗎?扇出交換髮送消息到所有隊列...

  1. 只有更多的消費者。檢查this example from rabbitmq tutorial
  2. 你真的不需要明確地配置rabbitmq,一切都可以由客戶端(發佈者和訂閱者)來完成,你只需要弄清楚你需要多少交換機以及它們應該是哪一種類型等等。
0

首先,你用什麼編程語言?最常見的語言,如python,java,c#,都支持爲並行進程創建額外的線程。

比方說,你消耗的隊列像下面(假性代碼):

def callback(ch, method, properties, body) ... 
def threaded_function(ch, method, properties, body) ... 

channel.basic_qos(prefetch_count=3) 
channel.basic_consume(callback, queue='task_queue') 
channel.start_consuming() 

第一,設置「prefetch_count = 3」讓你的消費者必須在-最不-ACK狀態3個消息兼任。

在回調方法中,您應該啓動一個線程來執行帶有threaded_function的每條消息。在threaded_function方法體的結束,這樣做:

ch.basic_ack(delivery_tag = method.delivery_tag) 

,這樣,在最3個消息可以並行處理,即使需要花費更長的時間爲一個或兩個線程來運行,別人仍然可以的處理下一個消息。