2017-04-07 92 views
1

嗨我正在嘗試編寫一個測試用例來實現故障轉移支持activeMQ。在停止和重新啓動時丟失消息Embedded ActiveMQ

下面是代碼

val brokerA = createBroker("A") 
brokerA.start 
val failoverUrl = s"failover:(vm://BrokerA?create=false)" + 
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true" 


val cFactory = new ActiveMQConnectionFactory(failoverUrl) 
val qConnection = getQueueConnection 
val session = createQueueSession(qConnection) 

private def totalReadMessagesCount(queueReceiver: QueueReceiver) = { 
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq 
messages.size 
} 

private def getReceiver = { 
val queueConnection = getQueueConnection 
queueConnection.start() 
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE) 
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName) 
queueReceiver 
} 

def getQueueConnection =cFactory.createQueueConnection("admin", "") 

def createBroker(name:String) = { 
val broker = new BrokerService() 
val adaptor = new KahaDBPersistenceAdapter() 
broker.setBrokerName("Broker" + name) 
broker.addConnector(getBrokerUrl) 
broker.setPersistent(true) 
broker.setUseJmx(false) 
broker.setUseShutdownHook(false) 
broker 
} 

def getBrokerUrl = "tcp://localhost:0" 


val queueReceiver: QueueReceiver = getReceiver 
val messageCount = 500 
(1 to messageCount) map {count => 
    //Calling method to send message to ActiveMQ 
    if(count == 200){ 
    brokerA.stop() 
    brokerA.waitUntilStopped() 
    brokerA.start(true) 
    } 
} 
val totalCount = totalReadMessagesCount(queueReceiver) 
println(s"Read ${totalCount} messages") 
assert(totalCount == messageCount) 

我能夠重新啓動後和ActiveMQ重新連接,但totalCount顯示的是300而不是500,似乎之前的消息都將丟失。但是,當我在非嵌入模式下運行相同的場景。我能夠獲取所有消息。

請幫助我如何防止在重新啓動嵌入式活動mq時丟失任何消息。

+0

你需要有ActiveMQ的兩(2)實例測試故障轉移。故障轉移意味着如果客戶端無法與給定的服務器通話,它將嘗試連接字符串中服務器列表中的下一個服務器。您的連接字符串中只有一臺服務器。看到這裏的文檔:http://activemq.apache.org/failover-transport-reference.html – zloster

回答

1

你必須persistent設置爲真,我不知道斯卡拉,但這裏是Java代碼

public BrokerService broker() throws Exception { 
    final BrokerService broker = new BrokerService(); 
    //broker.addConnector("tcp://localhost:61616"); 
    broker.addConnector("stomp://localhost:61613"); 
    broker.addConnector("vm://localhost"); 
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); 
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha"); 
    if (!dir.exists()) { 
     dir.mkdirs(); 
    } 
    persistenceAdapter.setDirectory(dir); 
    broker.setPersistenceAdapter(persistenceAdapter); 
    broker.setPersistent(true); 
    return broker; 
} 
+0

我剛剛更新我的例子。我堅持真實並加入了KahaDB。但是現在我已經有305人了。任何建議? –

+0

您已經創建了KahaDBPersistenceAdapter的一個實例,但是您沒有將其設置爲代理?像這樣broker.setPersistenceAdapter(adapter); –