2017-08-06 81 views
0

我使用的RabbitMQ RPC來獲取在多線程數據的env,我下面下面通道basicConsume消費者內部類是不GCed在jrabbitmq AVA API

  1. 創建用於整個應用程序單一連接。
  2. 每個線程創建一個通道。
  3. 創建消費者從每個線程消耗並返回。

問題:

  1. 我的問題是第三個我不能夠得到的RabbitMQ的Java API這一次又一次的用法相同的消費者。

  2. 如果我讓每個頻道創建一個消費者,那麼該消費者從不會因爲鏈接到頻道的原因而被GCed,並且該頻道永遠不會被刪除(安裝點2)。但是,如果我每次刪除頻道它得到GCed。

可能的方法解決:

  1. 我應該創建一個同步的HashMap,並保持所有消費者併發哈希表內,並加載它來回?

  2. rabbit mq basicGet()API:這似乎很混亂: 有些時候它會返回空響​​應,而rpc-server會繼續返回正確的值。可能的解決方法是放置

    while(true){ // //保持輪詢。 //一旦獲取數據就會中斷。 }

這似乎對我來說是一個破解。

我不認爲任何其他選項適合在這裏。

下面是我的代碼我使用以消耗來自兔MQ RPC數據:

public String fetchDataFromRpc(String requestQueueName, byte[] message, Channel channel) throws IOException, InterruptedException { 
     final BlockingQueue<String> response = new LinkedBlockingQueue<>(); 
     String replyQueueName = channel.queueDeclare().getQueue(); 
     String corrId = UUID.randomUUID().toString(); 
     System.out.println(" corr id : "+corrId); 
     AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); 
     channel.basicPublish("", requestQueueName, props, message); 
     Consumer consumer = new DefaultConsumer(channel) { 
      @Override 
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
       if (properties.getCorrelationId().equals(corrId)) { 
        response.offer(new String(body, "UTF-8")); 
        channel.queueDelete(replyQueueName); 
        System.out.println(" res corr id : "+corrId); 
       } 
      } 
     }; 
     channel.basicConsume(replyQueueName, true, consumer); 

     return response.take(); 
    } 

行: channel.basicConsume(replyQueueName,真實,消費者); 不斷創建新的消費者每次線程來到這個塊和舊的消費者永遠不會得到GCed,我得到它,因爲它鏈接到渠道,因此不被GCed所以我可以使用HashMap來存儲每個線程的所有消費者,並繼續從那裏加載? 。

回答

0

你應該真的basicCancel()消費者,當你完成。

但是,你應該得到一個handleCancel()調用,因爲你刪除了消費者隊列,所以我不明白爲什麼消費者不會GC'd。

爲每個請求創建臨時答覆隊列效率不高。 RabbitMQ現在提供了一個帶有僞回覆隊列的特殊直接replyTo機制。

爲什麼添加標籤?我在你的問題中看不到彈簧代碼。另一方面,如果您是春季店,則RabbitTemplate.sendAndReceive()(和convertSendAndReceive())方法將爲您處理所有這些(和it uses direct replyTo, by default, if the broker is new enough)。

0

你說的是類似下面

@覆蓋 公共無效handleDelivery(字符串consumerTag,信封信封,AMQP.BasicProperties性質,字節[]體)拋出IOException異常{ 如果(properties.getCorrelationId()。等於(corrId)){ response.offer(new String(body,「UTF-8」)); channel.basicCancel(consumerTag); channel.queueDelete(replyQueueName); } }