我想發送消息給RabbitMQ服務器,然後等待回覆消息(在「回覆」隊列中)。當然,我不想永遠等待處理這些消息的應用程序停機 - 這需要超時。這聽起來像是一項非常基本的任務,但我找不到辦法做到這一點。我現在用Java API遇到了這個問題。RabbitMQ等待超時消息
8
A
回答
-1
也有類似的問題。雖然它的答案不使用Java,也許你可以得到一些提示。
0
com.rabbitmq.client.QueueingConsumer
有nextDelivery(long timeout)
方法,它會做你想要什麼。但是,這已被棄用。 編寫你自己的超時並不難,儘管有一個正在進行的線程和一個實時標識符列表可能會更好,而不是始終添加和刪除使用者和關聯的超時線程。
修改即可添加:回覆後注意日期!
0
我使用C#通過創建一個對象跟蹤對特定消息的響應來處理此問題。它爲消息設置一個唯一的回覆隊列,並訂閱它。如果在指定的時間內未收到響應,則倒數計時器會取消訂閱,這會刪除隊列。另外,我擁有可以從我的主線程(使用信號量)或異步(使用回調)同步的方法來利用此功能。
基本上,實施看起來是這樣的:
//Synchronous case:
//Throws TimeoutException if timeout happens
var msg = messageClient.SendAndWait(theMessage);
//Asynchronous case
//myCallback receives an exception message if there is a timeout
messageClient.SendAndCallback(theMessage, myCallback);
2
的RabbitMQ的Java客戶端庫現在supports a timeout argument to its QueueConsumer.nextDelivery()
method。
例如,RPC教程使用下面的代碼:
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
現在,你可以使用consumer.nextDelivery(1000)
等待最大一秒鐘。如果達到超時,則該方法返回null
。
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
// Use a timeout of 1000 milliseconds
QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
// Test if delivery is null, meaning the timeout was reached.
if (delivery != null &&
delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}