我有一個應用程序使用RabbitMQ作爲消息隊列來發送/接收兩個組件之間的消息:發送者和接收者。發送者以非常快的方式發送消息。接收器接收到該消息,然後執行一些非常耗時的任務(主要是爲非常大的數據大小編寫數據庫)。由於接收者需要很長時間才能完成任務,然後檢索隊列中的下一條消息,因此發送者將會繼續快速填充隊列。所以我的問題是:這會導致消息隊列溢出嗎?RabbitMQ:快速生產者和慢速消費者
的消息消費者看起來如下:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
消費者收到的每條消息都含有caseID。對於每個caseID,它會將大量數據保存到數據庫,這需要很長時間。目前只有一個消費者爲RabbitMQ設置,因爲生產者/消費者使用相同的隊列來發布/訂閱caseID。那麼如何加快消費者吞吐量,讓消費者趕上生產者並避免隊列中的消息溢出?我應該在消費者部分使用多線程來加快消費速度嗎?還是應該使用多個消費者同時使用收到的消息?或者是否有任何異步方式讓消費者異步使用消息而不等待完成?歡迎任何建議。
從[RabbitMQ的文檔](http://www.rabbitmq.com/tutorials/tutorial-three-python.html),這裏有兩種方法:工作者隊列,發佈/訂閱。我現在正在使用pub/sub模型。我應該使用工作者隊列來代替多個消費者嗎? – tonga 2014-10-28 20:30:46
對於你需要的應該是工作隊列。這是如何實現https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 – voutrin 2014-10-28 20:35:53
但是,如果我想用幾個隊列用於不同的目的呢?現在,caseID消息只有一個隊列。除caseID外,可能還有更多的數據。所以我可能需要使用發佈/訂閱模式來擁有多個隊列。 – tonga 2014-10-28 20:42:29