2014-09-04 127 views
0

我試圖找到一個問題的解決方案,以保證消息只被一個消費者完全處理。ActiveMQ故障轉移重新連接消息回滾

隊列上的許多消息和一些消費者讀取消息並處理它們寫入數據庫。我的消息是經過處理的,所以如果消費者死亡,那麼消息就會返回到另一個消費者處理的隊列中。

我們必須對activemq進行主動/被動配置,這會導致問題。如果我停止激活的activemq,那麼當我使用故障轉移傳輸時,使用者將重新連接到另一個activemq。這很好,但在重新連接期間,消息被放回隊列中,並且消費者沒有意識到這種重新連接並繼續處理。這導致2個消費者處理相同消息的情況。

我本來希望使用分佈式事務管理器,這可能發生在將來,但現在我需要一個不同的解決方案。

如果我不使用故障轉移傳輸,那麼我可以掛接到JMSException偵聽器並中止使用者。不幸的是,這在使用故障轉移傳輸時不起作用。

我想要使用故障轉移傳輸進行初始連接(發現哪個activemq正在運行),然後強制故障轉移不要重新連接...或者使用允許服務器列表嘗試但不連接的不同傳輸不要重新連接...或者去找聽聽重新連接。

請注意,有時只有一臺服務器使用故障切換(重新連接)。

我可以做我在初始連接邏輯(狩獵活動服務器),但要檢查是否有另一種選擇

+0

剛纔我發現我可以使用發現來啓動一個有效的連接 – Paul 2014-09-04 09:45:33

回答

0

您可以收聽通過使用監聽器傳輸的ActiveMQConnection事件:

connection = (ActiveMQConnection)factory.createConnection(); 
    connection.addTransportListener(new TransportListener() { 
    public void onCommand(Object command) { 
     // Do something 
    } 

    public void onException(IOException error) { 
     // Do something 
    } 

    public void transportInterupted() { 
     // Do something 
    } 

    public void transportResumed() { 
     // Do something 
    } 
}); 
connection.start(); 

請注意,在此示例中,監聽器直接設置在Connection上;但是,您可以在ActiveMQConnectionFactory上設置一個實例,該實例將分配給它創建的每個Connection實例。

+0

不幸的是,我沒有連接的句柄,因爲我通過DefaultMessageListenerContainer從PooledConnectionFactory獲取它。 – Paul 2014-09-04 11:44:11

+0

看編輯,你也可以在你的ActiveMQConnectionFactory上設置一個監聽器。 – 2014-09-04 13:23:14

+0

雖然PooledConnectionfactory不公開它,但這是Spring-Boot爲您提供的。我可以解決這個問題,但是我已經進行了測試,以獲得有效的連接,然後偵聽jms異常。 – Paul 2014-09-04 14:04:56