嗨我正在嘗試編寫一個測試用例來實現故障轉移支持activeMQ。在停止和重新啓動時丟失消息Embedded ActiveMQ
下面是代碼
val brokerA = createBroker("A")
brokerA.start
val failoverUrl = s"failover:(vm://BrokerA?create=false)" +
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true"
val cFactory = new ActiveMQConnectionFactory(failoverUrl)
val qConnection = getQueueConnection
val session = createQueueSession(qConnection)
private def totalReadMessagesCount(queueReceiver: QueueReceiver) = {
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq
messages.size
}
private def getReceiver = {
val queueConnection = getQueueConnection
queueConnection.start()
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName)
queueReceiver
}
def getQueueConnection =cFactory.createQueueConnection("admin", "")
def createBroker(name:String) = {
val broker = new BrokerService()
val adaptor = new KahaDBPersistenceAdapter()
broker.setBrokerName("Broker" + name)
broker.addConnector(getBrokerUrl)
broker.setPersistent(true)
broker.setUseJmx(false)
broker.setUseShutdownHook(false)
broker
}
def getBrokerUrl = "tcp://localhost:0"
val queueReceiver: QueueReceiver = getReceiver
val messageCount = 500
(1 to messageCount) map {count =>
//Calling method to send message to ActiveMQ
if(count == 200){
brokerA.stop()
brokerA.waitUntilStopped()
brokerA.start(true)
}
}
val totalCount = totalReadMessagesCount(queueReceiver)
println(s"Read ${totalCount} messages")
assert(totalCount == messageCount)
我能夠重新啓動後和ActiveMQ重新連接,但totalCount
顯示的是300而不是500,似乎之前的消息都將丟失。但是,當我在非嵌入模式下運行相同的場景。我能夠獲取所有消息。
請幫助我如何防止在重新啓動嵌入式活動mq時丟失任何消息。
你需要有ActiveMQ的兩(2)實例測試故障轉移。故障轉移意味着如果客戶端無法與給定的服務器通話,它將嘗試連接字符串中服務器列表中的下一個服務器。您的連接字符串中只有一臺服務器。看到這裏的文檔:http://activemq.apache.org/failover-transport-reference.html – zloster