2017-02-14 230 views
0

試圖將HiveMQ的兩個功能:共享訂閱和永久會話結合起來。HiveMQ與永久會話共享訂閱

如果創建了一個非常簡單的消息生產者。和一個非常簡單的消費者。 運行多個使用者時,所有使用者都會收到所有消息。

爲消費者設置clearSession爲'false'後,當運行消費者並重新啓動消費者時,消費者也會在未連接時收到消息。優秀。

現在將其與共享訂閱功能相結合。 僅使用共享訂閱時,clearSession爲'true'。運行多個消費者時,消息只能由單個消費者接收。它應該是循環法,情況也是如此,但是一旦停止消費者,消息就不再循環,但其中一個消費者獲得的消息明顯多於其他消息。

如果我現在再次啓用持久性會話,clearSession爲'false',並啓動共享訂閱使用者,消費者開始再次接收所有消息,而不是僅將消息發送到一個客戶端。

這裏有什麼問題? 這是HiveMQ中的錯誤嗎? 永久會話和共享訂閱是否可以一起使用?那真是一個無賴。

UPDATE 15/2/2017 作爲@fraschbi建議我清除了所有數據,並再次重新測試與持續性會話消費者共享訂閱。它似乎工作!

奇怪的是,只有第一位消費者重新連接後纔會收到錯過的消息。 所有消費者都有相同的代碼,它們只是以不同的clientId參數開始。見下面的代碼。 我的測試序列:

  • 開始消費者1:所有消息都是這個消費者。
  • 開始消費者2:每個消費者收到其他所有消息。
  • 開始消費者3:每個消費者獲得3比3的消息。
  • 停止消費者1:現在消費者2和3收到其他所有消息。 (不知道爲什麼我昨天看到了這個不均勻的分佈,但可能是@fraschbi提到的,​​因爲我正在重新使用clientId,並且沒有取消訂閱或正確斷開連接)
  • 現在停止consumer2:現在消費者3接收的所有消息。
  • 停止consumer3:不再收到消息。
  • 重新啓動consumer3:它繼續發送生產者發送的第一條消息。 它沒有收到丟失的消息
  • 重新啓動consumer2:消息再次均勻分佈。
  • 重新啓動消費者1:這一個現在收到所有丟失的消息,然後繼續接收每3個消息中的1。

所以我的新問題是:爲什麼只有第一位消費者收到丟失的消息?

注意:這裏的技巧仍然是在停止客戶端時取消訂閱,因爲訂閱/持久性設置已丟失!

Producer.scala

object Producer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.connect() 
    val theTopic = client.getTopic(topic) 

    var count = 0 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
    client.disconnect() 
    println("Disconnected.") 
    } 

    while(true) { 
    val msg = new MqttMessage(s"Message: $count".getBytes()) 
    theTopic.publish(msg) 
    println(s"Published: $msg") 

    Thread.sleep(1000) 

    count = count + 1 
    } 
} 

Consumer.scala

object Consumer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = args(1) 
// val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.setCallback(new MqttCallback { 
    override def deliveryComplete(token: IMqttDeliveryToken) =() 

    override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}") 

    override def connectionLost(cause: Throwable) = println("Connection lost") 
    }) 

    println(s"Start $clientId consuming from topic: $topic") 
    val options = new MqttConnectOptions() 
    options.setCleanSession(false); 

    client.connect(options) 
    client.subscribe(topic) 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
// client.unsubscribe(topic) 
    client.disconnect() 
    println("Disconnected.") 
    } 


    while(true) { 

    } 

} 

回答

1

我會盡量回答你單獨遇到的兩個問題。

它應該是循環法,也是這種情況,但只要您停止消費者,消息不再循環,但其中一個消費者獲得明顯更多的消息, 。

在爲共享訂閱分發郵件時,HiveMQ更喜歡在線客戶端。

如果我現在再次啓用持久性會話,clearSession爲'false',並啓動共享訂閱使用者,消費者開始接收所有消息而不是消息只是傳遞給一個客戶端。

在問題開始時,您表示您將客戶端與cleanSession=false連接到經紀商並訂閱該主題。 (聽起來好像您只使用一個主題。) 是否有可能在重新連接cleanSession=false和共享訂閱之前取消訂閱這些客戶端?在這種情況下,來自場景第一步的訂閱仍將保留給這些客戶端,當然他們每個都會收到這些消息。

編輯:

所以我的新問題是:爲什麼只有第一個消費者收到丟失的消息?

從HiveMQ用戶指南:

當用戶離線隊列已滿,該客戶端的消息將不會被丟棄,但排隊等待在共享訂閱組的下一個脫機客戶端。

當所有客戶端都處於脫機狀態時,分配不再循環。所以你描述的場景是在預期的行爲之內。

消息隊列的默認值爲1000.因此,您可以發送超過1000條消息,而客戶端處於脫機狀態,或者減少消息隊列大小。

... 
<persistence> 
    <queued-messages> 

     <max-queued-messages>50</max-queued-messages> 

    </queued-messages> 
    ... 
</persistence> 
... 

將此添加到您的​​3210用於減少消息隊列大小。

+0

是的,客戶端不會退訂。剛剛停下來。明天再試。 –

+0

重新測試並更新了問題 –