我正在嘗試爲使用Play和akka流的Websocket連接創建一個簡單的代理。 的流量是這樣的:使用Play 2.6和akka流的Websocket代理
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
我下面的一些例子後,想出了下面的代碼:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
有了這個代碼,流量從客戶到代理進入到服務器,回到代理服務器,就是這樣。它不會進一步傳達給客戶。我怎樣才能解決這個問題 ? 我想我需要以某種方式連接serverIncoming
沉到source
在finalFlow
,但我無法弄清楚如何做到這一點...
還是我完全錯了這種方法?使用Bidiflow
還是Graph
更好?我是新來的阿卡流,仍然試圖弄清楚事情。
這不是代理服務器。這是一個非常簡單的服務器,大寫字符串並使用Web套接字將它們發送回客戶端。 –
@morganfreeman我的意思是控制器本身就是代理。 UpperService可以由調用外部服務來執行實際處理的actor來替代,這些行包括:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services 。但我可能沒有正確理解你的問題。 – botkop
確實,UpperService理論上可以通過Web Socket將幀發送到服務器。最初嘗試過。該服務器的Web Socket是一個流。我無法將參與者的接收方法「連接」到流的源和流的接收器返回到「套接字」(因此數據返回到客戶端)。我可以在接收方法中初始化Web套接字流,但每次都會打開與服務器的連接。 –