2016-01-22 73 views
0

我正在使用嵌入式ActiveMQ代理。 我的目標是找到一種方法來檢測隊列上的外部生產者何時失去連接。如何在生產者連接中斷時得到通知?

我開始經紀人是這樣的:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port); 
setLastMessagesPersistent(broker); 
broker.start(); 

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); 
connection = factory.createConnection(); 
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

connection.start(); 

此後,我嘗試添加TransportListener:

((ActiveMQConnection) connection).addTransportListener(new TransportListener() { 
    public void transportResumed() { 
     System.out.println("resumed"); 
    } 
    public void transportInterupted() { 
     System.out.println("interrupted"); 
    } 
    public void onException(IOException arg0) { 
     System.out.println("ioexception: " + arg0); 
    } 
    public void onCommand(Object arg0) { 
     System.out.println("command: " + arg0); 
    } 
}); 

我也註冊一個消費者和ProducerListener這樣的:

Destination dest = session.createQueue(queuename); 
MessageConsumer consumer = session.createConsumer(dest); 

ProducerEventSource source = new ProducerEventSource(connection, dest); 
System.out.println("Setting Producer Listener"); 
source.setProducerListener(prodevent -> { 
    System.out.println("producer status: " + prodevent.isStarted()); 
}); 
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside 
new Thread(() -> { 
    try { 
     consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID)); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
}).start(); 

不幸的是,TransportListener和ProducerListener都不能給我一個y當我強制退出先前作爲製作人添加的另一個應用程序時(Alt + F4),輸出y。該經紀人肯定會注意到:

WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset 
WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset 

但我沒有找到一種方法來獲取這些Java事件的回調。 我也嘗試在代理中設置自定義IOExceptionHandler,並在連接中添加ExceptionListener。他們也從未被召喚過。

回答

1

您可以使用諮詢主題ActiveMQ.Advisory.Connection或甚至ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic,它們提供了關於生產者數量,檢查此鏈接http://activemq.apache.org/advisory-message.html

+0

謝謝,這解決了我。聽取具有「RemoveInfo」類型數據結構的諮詢事件正是我所需要的。 – Felk

0

一個可能的方法是分析日誌輸出來進行連接重置

WARN |傳輸連接到:tcp://127.0.0.1:58988失敗: java.net.SocketException:連接重置 警告|交通運輸 連接到:TCP://127.0.0.1:58986失敗:java.net.SocketException異常: 連接重置

由於插座中的ActiveMQ實現你必須添加的ExceptionListener有寫自己異常處理例程...

相關問題