3
我有websocket客戶端連接到akka-http websocket服務器,如何監聽服務器上發生的連接關閉事件(即服務器關閉/服務器關閉websocket連接)?如何使用akka-http websocket客戶端監聽websocket服務器關閉事件
object Client extends App {
implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
val config = actorSystem.settings.config
val interface = config.getString("app.interface")
val port = config.getInt("app.port")
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ => {
sourceQueue.map(q => {
println(s"offering message on client")
q.offer(TextMessage("received unknown"))
})
println(s"received unknown message format")
}
}
val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]
val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
p.trySuccess(m)
m
})
.keepAlive(FiniteDuration(1, TimeUnit.SECONDS),() => TextMessage.Strict("Heart Beat"))
(s, p.future)
}
val flow =
Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)
val (upgradeResponse, sourceClose) =
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can get 404 NotFound,
// with a response body, that will be available from upgrade.response
if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
}
謝謝:),我怎麼能重新連接(試行)關閉? – invariant
這取決於你的架構。我認爲最簡單的方法是將'singleWebSocketRequest()'調用和'sinkClose.onComplete'處理程序提取到一個方法,並從'onComplete'處理程序遞歸調用此方法。 –
好ty,我希望他們支持這個開箱即用socket.io http://stackoverflow.com/questions/13797262/how-to-re-connect-to-websocket-after-close-connection – invariant