我使用的RabbitMQ RPC來獲取在多線程數據的env,我下面下面通道basicConsume消費者內部類是不GCed在jrabbitmq AVA API
- 創建用於整個應用程序單一連接。
- 每個線程創建一個通道。
- 創建消費者從每個線程消耗並返回。
問題:
我的問題是第三個我不能夠得到的RabbitMQ的Java API這一次又一次的用法相同的消費者。
如果我讓每個頻道創建一個消費者,那麼該消費者從不會因爲鏈接到頻道的原因而被GCed,並且該頻道永遠不會被刪除(安裝點2)。但是,如果我每次刪除頻道它得到GCed。
可能的方法解決:
我應該創建一個同步的HashMap,並保持所有消費者併發哈希表內,並加載它來回?
rabbit mq basicGet()API:這似乎很混亂: 有些時候它會返回空響應,而rpc-server會繼續返回正確的值。可能的解決方法是放置
while(true){ // //保持輪詢。 //一旦獲取數據就會中斷。 }
這似乎對我來說是一個破解。
我不認爲任何其他選項適合在這裏。
下面是我的代碼我使用以消耗來自兔MQ RPC數據:
public String fetchDataFromRpc(String requestQueueName, byte[] message, Channel channel) throws IOException, InterruptedException {
final BlockingQueue<String> response = new LinkedBlockingQueue<>();
String replyQueueName = channel.queueDeclare().getQueue();
String corrId = UUID.randomUUID().toString();
System.out.println(" corr id : "+corrId);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
channel.queueDelete(replyQueueName);
System.out.println(" res corr id : "+corrId);
}
}
};
channel.basicConsume(replyQueueName, true, consumer);
return response.take();
}
行: channel.basicConsume(replyQueueName,真實,消費者); 不斷創建新的消費者每次線程來到這個塊和舊的消費者永遠不會得到GCed,我得到它,因爲它鏈接到渠道,因此不被GCed所以我可以使用HashMap來存儲每個線程的所有消費者,並繼續從那裏加載? 。