2014-10-28 898 views
13

我有一個應用程序使用RabbitMQ作爲消息隊列來發送/接收兩個組件之間的消息:發送者和接收者。發送者以非常快的方式發送消息。接收器接收到該消息,然後執行一些非常耗時的任務(主要是爲非常大的數據大小編寫數據庫)。由於接收者需要很長時間才能完成任務,然後檢索隊列中的下一條消息,因此發送者將會繼續快速填充隊列。所以我的問題是:這會導致消息隊列溢出嗎?RabbitMQ:快速生產者和慢速消費者

的消息消費者看起來如下:

public void onMessage() throws IOException, InterruptedException { 
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue(); 
    channel.queueBind(queueName, EXCHANGE_NAME, ""); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(queueName, true, consumer); 

    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 
     System.out.println(" [x] Received '" + message + "'"); 

     JSONObject json = new JSONObject(message); 
     String caseID = json.getString("caseID"); 
     //following takes very long time    
     dao.saveToDB(caseID); 
    } 
} 

消費者收到的每條消息都含有caseID。對於每個caseID,它會將大量數據保存到數據庫,這需要很長時間。目前只有一個消費者爲RabbitMQ設置,因爲生產者/消費者使用相同的隊列來發布/訂閱caseID。那麼如何加快消費者吞吐量,讓消費者趕上生產者並避免隊列中的消息溢出?我應該在消費者部分使用多線程來加快消費速度嗎?還是應該使用多個消費者同時使用收到的消息?或者是否有任何異步方式讓消費者異步使用消息而不等待完成?歡迎任何建議。

回答

1

「那麼,如何加快消費者吞吐量,讓消費者趕上生產者並避免隊列中的消息溢出?」這是「使用多個消費者同時消費傳入消息」的答案,使用多線程並行運行這些消費者實現原則共享任何東西,http://www.eaipatterns.com/CompetingConsumers.html

+0

從[RabbitMQ的文檔](http://www.rabbitmq.com/tutorials/tutorial-three-python.html),這裏有兩種方法:工作者隊列,發佈/訂閱。我現在正在使用pub/sub模型。我應該使用工作者隊列來代替多個消費者嗎? – tonga 2014-10-28 20:30:46

+0

對於你需要的應該是工作隊列。這是如何實現https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 – voutrin 2014-10-28 20:35:53

+0

但是,如果我想用幾個隊列用於不同的目的呢?現在,caseID消息只有一個隊列。除caseID外,可能還有更多的數據。所以我可能需要使用發佈/訂閱模式來擁有多個隊列。 – tonga 2014-10-28 20:42:29

0

作爲答案,我建議:兩者。

您可以利用多個接收器,以及設置每個接收器在單獨的線程中執行任務,從而允許接收器接受隊列中的下一條消息。

當然,這種方法假定每個操作的結果(如果我理解正確的話,在db上寫入)不會以任何方式影響後續操作對其他消息的響應結果。

1

你有很多方法來提高你的表現。

  1. 您可以創建一個更多生產者的工作隊列,這樣您就可以創建一個簡單的負載均衡系統。不要使用交換--->隊列,但只能排隊。閱讀本帖RabbitMQ Non-Round Robin Dispatching

  2. 當你收到一條消息時,你可以創建一個poolthread用於在數據庫中插入數據,但在這種情況下,你必須管理失敗。

但我認爲主要問題是數據庫而不是RabbitMQ。通過良好的調優,多線程和工作隊列,您可以擁有可擴展且快速的解決方案。

讓我知道

14

「這會不會導致消息隊列溢出?」

是的。隨着隊列長度的增加,RabbitMQ將進入「流量控制」狀態以防止過多的內存消耗。它也會開始將消息保存到磁盤,而不是將它們保存在內存中。

「所以,我怎麼能加快消費者的吞吐量,使消費者 可以與製片人趕上並避免 隊列中的消息溢出」

你有2種選擇:

  1. 添加更多消費者。請記住,如果您選擇此選項,您的數據庫現在將被多個併發進程操縱。確保數據庫能夠承受額外的壓力。
  2. 增加消費渠道的價值QOS。這將從隊列中提取更多消息並將其緩存在消費者上。這會增加總體處理時間;如果緩衝了5條消息,則第5條消息將完成消息1 ... 5的處理時間。

「我應該使用多線程在消費部分,以加快 消費率是多少?」

不除非你有一個設計良好的解決方案。嚮應用程序添加並行性會在消費者方面增加很多開銷。您可能會耗盡ThreadPool或限制內存使用量。

在處理AMQP時,您確實需要考慮每個流程的業務需求,以設計最佳解決方案。您的傳入消息的時間敏感程度如何?他們是否需要堅持到數據庫儘快,或者這對您的用戶是否重要,無論這些數據是否立即可用?

如果數據不需要立即保存,則可以修改應用程序,以便使用者只需從隊列中移除消息並將其保存到Redis中的緩存集合中。引入第二個進程,然後依次讀取和處理緩存的消息。這將確保您的隊列長度不會增長到足以導致流量控制,同時防止數據庫被寫入請求轟炸,寫入請求通常比讀取請求更昂貴。您的消費者現在只需從隊列中移除消息,稍後再由另一個進程處理。

+0

謝謝保羅。這是一個非常好的建議。我的數據不需要立即保存在數據庫中。數據庫持久部分需要很長時間,因爲它涉及每種情況的數據解析,然後在一個數據庫插入中保存大量數據(〜10000行)。因此,使用Redis是一個好主意,因爲它是內存緩存。但最終我仍然需要將數據保存到數據庫。那麼在消費者接收消息並保存到Redis後,如何使用Redis完成數據庫寫入任務?如果數據庫插入速度很慢,消費者是否會溢出Redis緩存大小限制? – tonga 2014-10-30 21:43:12

+0

我會消耗來自單個或多個進程的每條消息,一旦它被提交給數據庫,就會清除來自Redis的消息。 Redis中沒有緩存限制 - 您受主機上RAM數量的限制。 1,000,000個相對較小的密鑰大約是200Mb。如果您擔心內存不足,請查看:http://redis.io/topics/memory-optimization – 2014-11-01 15:04:00

+0

我添加了一篇文章,概述了擴展AMQP的方法以及相關的獎勵和缺點:http://insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consumers/ – 2014-11-11 14:16:03

1

雖然確實如此,但增加更多消費者可能會加快速度,真正的問題將會保存到數據庫中。

這裏已經有很多關於添加消費者(線程和/或機器)和改變QoS的答案,所以我不打算重申這一點。相反,您應該認真考慮使用Aggregator模式將消息聚合爲一組消息,然後一次性批量插入數據庫。

您的每條消息的當前代碼可能會打開一個連接,插入數據並關閉該連接(或返回到池)。更糟的是,它甚至可能使用交易。

通過使用聚合器模式,您基本上在刷新之前緩衝數據。

現在寫一個好的聚合器是棘手的。您將需要決定如何緩衝(即每個工作人員都有自己的緩衝區或Redis等中央緩衝區)。我認爲Spring集成有一個聚合器。