2016-06-09 105 views
3

我有websocket客戶端連接到akka-http websocket服務器,如何監聽服務器上發生的連接關閉事件(即服務器關閉/服務器關閉websocket連接)?如何使用akka-http websocket客戶端監聽websocket服務器關閉事件

object Client extends App { 


    implicit val actorSystem = ActorSystem("akka-system") 
    implicit val flowMaterializer = ActorMaterializer() 

    val config = actorSystem.settings.config 
    val interface = config.getString("app.interface") 

    val port = config.getInt("app.port") 


    // print each incoming strict text message 
    val printSink: Sink[Message, Future[Done]] = 
    Sink.foreach { 
     case message: TextMessage.Strict => 
     println(message.text) 

     case _ => { 
     sourceQueue.map(q => { 
      println(s"offering message on client") 
      q.offer(TextMessage("received unknown")) 
     }) 
     println(s"received unknown message format") 
     } 
    } 

    val (source, sourceQueue) = { 
    val p = Promise[SourceQueue[Message]] 
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => { 
     p.trySuccess(m) 
     m 
    }) 
     .keepAlive(FiniteDuration(1, TimeUnit.SECONDS),() => TextMessage.Strict("Heart Beat")) 
    (s, p.future) 
    } 

    val flow = 
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right) 


    val (upgradeResponse, sourceClose) = 
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow) 

    val connected = upgradeResponse.map { upgrade => 
    // just like a regular http request we can get 404 NotFound, 
    // with a response body, that will be available from upgrade.response 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Done 
    } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
    } 
    } 


    connected.onComplete(println) 

} 

回答

3

WebSocket連接終止建模爲一個常規流完成,因此你的情況,你可以使用物化Future[Done]產生由Sink.foreach

val flow = Flow.fromSinkAndSourceMat(printSink, source)(Keep.both) 

val (upgradeResponse, (sinkClose, sourceClose)) = 
    Http().singleWebSocketRequest(..., flow) 

sinkClose.onComplete { 
    case Success(_) => println("Connection closed gracefully") 
    case Failure(e) => println("Connection closed with an error: $e") 
} 
+0

謝謝:),我怎麼能重新連接(試行)關閉? – invariant

+0

這取決於你的架構。我認爲最簡單的方法是將'singleWebSocketRequest()'調用和'sinkClose.onComplete'處理程序提取到一個方法,並從'onComplete'處理程序遞歸調用此方法。 –

+1

好ty,我希望他們支持這個開箱即用socket.io http://stackoverflow.com/questions/13797262/how-to-re-connect-to-websocket-after-close-connection – invariant

相關問題