試圖將HiveMQ的兩個功能:共享訂閱和永久會話結合起來。HiveMQ與永久會話共享訂閱
如果創建了一個非常簡單的消息生產者。和一個非常簡單的消費者。 運行多個使用者時,所有使用者都會收到所有消息。
爲消費者設置clearSession爲'false'後,當運行消費者並重新啓動消費者時,消費者也會在未連接時收到消息。優秀。
現在將其與共享訂閱功能相結合。 僅使用共享訂閱時,clearSession爲'true'。運行多個消費者時,消息只能由單個消費者接收。它應該是循環法,情況也是如此,但是一旦停止消費者,消息就不再循環,但其中一個消費者獲得的消息明顯多於其他消息。
如果我現在再次啓用持久性會話,clearSession爲'false',並啓動共享訂閱使用者,消費者開始再次接收所有消息,而不是僅將消息發送到一個客戶端。
這裏有什麼問題? 這是HiveMQ中的錯誤嗎? 永久會話和共享訂閱是否可以一起使用?那真是一個無賴。
UPDATE 15/2/2017 作爲@fraschbi建議我清除了所有數據,並再次重新測試與持續性會話消費者共享訂閱。它似乎工作!
奇怪的是,只有第一位消費者重新連接後纔會收到錯過的消息。 所有消費者都有相同的代碼,它們只是以不同的clientId參數開始。見下面的代碼。 我的測試序列:
- 開始消費者1:所有消息都是這個消費者。
- 開始消費者2:每個消費者收到其他所有消息。
- 開始消費者3:每個消費者獲得3比3的消息。
- 停止消費者1:現在消費者2和3收到其他所有消息。 (不知道爲什麼我昨天看到了這個不均勻的分佈,但可能是@fraschbi提到的,因爲我正在重新使用clientId,並且沒有取消訂閱或正確斷開連接)
- 現在停止consumer2:現在消費者3接收的所有消息。
- 停止consumer3:不再收到消息。
- 重新啓動consumer3:它繼續發送生產者發送的第一條消息。 它沒有收到丟失的消息。
- 重新啓動consumer2:消息再次均勻分佈。
- 重新啓動消費者1:這一個現在收到所有丟失的消息,然後繼續接收每3個消息中的1。
所以我的新問題是:爲什麼只有第一位消費者收到丟失的消息?
注意:這裏的技巧仍然是在停止客戶端時取消訂閱,因爲訂閱/持久性設置已丟失!
Producer.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
Consumer.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) =()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}
是的,客戶端不會退訂。剛剛停下來。明天再試。 –
重新測試並更新了問題 –