2017-04-12 65 views
5

我正在嘗試爲使用Play和akka流的Websocket連接創建一個簡單的代理。 的流量是這樣的:使用Play 2.6和akka流的Websocket代理

(Client) request ->   -> request (Server) 
         Proxy 
(Client) response <-   <- response (Server) 

我下面的一些例子後,想出了下面的代碼:

def socket = WebSocket.accept[String, String] { request => 

val uuid = UUID.randomUUID().toString 

// wsOut - actor that deals with incoming websocket frame from the Client 
// wsIn - publisher of the frame for the Server 
val (wsOut: ActorRef, wsIn: Publisher[String]) = { 
    val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail) 
    val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false) 
    source.toMat(sink)(Keep.both).run() 
} 

// sink that deals with the incoming messages from the Server 
val serverIncoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println("The server has sent: " + message.text) 
    } 

// source for sending a message over the WebSocket 
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_)) 

// flow to use (note: not re-usable!) 
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000")) 

// the materialized value is a tuple with 
// upgradeResponse is a Future[WebSocketUpgradeResponse] that 
// completes or fails when the connection succeeds or fails 
// and closed is a Future[Done] with the stream completion from the incoming sink 
val (upgradeResponse, closed) = 
serverOutgoing 
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
    .toMat(serverIncoming)(Keep.both) // also keep the Future[Done] 
    .run() 

// just like a regular http request we can access response status which is available via upgrade.response.status 
// status code 101 (Switching Protocols) indicates that server support WebSockets 
val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
    Future.successful(Done) 
    } else { 
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
    } 
} 

// in a real application you would not side effect here 
connected.onComplete(println) 
closed.foreach(_ => println("closed")) 

val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid)) 
val finalFlow = { 
    val sink = Sink.actorRef(actor, akka.actor.Status.Success(())) 
    val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ??? 
    Flow.fromSinkAndSource(sink, source) 
} 

finalFlow 

有了這個代碼,流量從客戶到代理進入到服務器,回到代理服務器,就是這樣。它不會進一步傳達給客戶。我怎樣才能解決這個問題 ? 我想我需要以某種方式連接serverIncoming沉到sourcefinalFlow,但我無法弄清楚如何做到這一點...

還是我完全錯了這種方法?使用Bidiflow還是Graph更好?我是新來的阿卡流,仍然試圖弄清楚事情。

回答

3

以下似乎工作。注意:我已經在同一個控制器中實現了服務器套接字和代理套接字,但是可以拆分它們或在單獨的實例上部署相同的控制器。在這兩種情況下,需要更新'上'服務的ws網址。

package controllers 

import javax.inject._ 

import akka.actor.{Actor, ActorRef, ActorSystem, Props} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse} 
import akka.stream.Materializer 
import akka.stream.scaladsl.Flow 
import play.api.libs.streams.ActorFlow 
import play.api.mvc._ 

import scala.concurrent.{ExecutionContext, Future} 
import scala.language.postfixOps 

@Singleton 
class SomeController @Inject()(implicit exec: ExecutionContext, 
           actorSystem: ActorSystem, 
           materializer: Materializer) extends Controller { 

    /*--- proxy ---*/ 
    def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = 
    Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket")) 

    def proxySocket: WebSocket = WebSocket.accept[String, String] { _ => 
    Flow[String].map(s => TextMessage(s)) 
     .via(websocketFlow) 
     .map(_.asTextMessage.getStrictText) 
    } 

    /*--- server ---*/ 
    class UpperService(socket: ActorRef) extends Actor { 
    override def receive: Receive = { 
     case s: String => socket ! s.toUpperCase() 
     case _ => 
    } 
    } 

    object UpperService { 
    def props(socket: ActorRef): Props = Props(new UpperService(socket)) 
    } 

    def upperSocket: WebSocket = WebSocket.accept[String, String] { _ => 
    ActorFlow.actorRef(out => UpperService.props(out)) 
    } 
} 

您需要的路由進行設置是這樣的://本地主機:9000 /代理插槽

GET /upper-socket controllers.SomeController.upperSocket 
GET /proxy-socket controllers.SomeController.proxySocket 

您可以通過發送一個字符串,WS測試。答案將是大寫字符串。

會有1分鐘後不活動的,雖然超時:

akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute 

但見:如何配置此http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html

+0

這不是代理服務器。這是一個非常簡單的服務器,大寫字符串並使用Web套接字將它們發送回客戶端。 –

+0

@morganfreeman我的意思是控制器本身就是代理。 UpperService可以由調用外部服務來執行實際處理的actor來替代,這些行包括:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services 。但我可能沒有正確理解你的問題。 – botkop

+0

確實,UpperService理論上可以通過Web Socket將幀發送到服務器。最初嘗試過。該服務器的Web Socket是一個流。我無法將參與者的接收方法「連接」到流的源和流的接收器返回到「套接字」(因此數據返回到客戶端)。我可以在接收方法中初始化Web套接字流,但每次都會打開與服務器的連接。 –

2

代理需要提供兩個流(代理流A/B):

(Client) request -> Proxy Flow A -> request (Server) 

(Client) response <- Proxy Flow B <- response (Server) 

一個選項來實現這種代理流被使用ActorSubscriber和SourceQueue:

class Subscriber[T](proxy: ActorRef) extends ActorSubscriber { 
    private var queue = Option.empty[SourceQueueWithComplete[T]] 
    def receive = { 
    case Attach(sourceQueue) => queue = Some(sourceQueue) 
    case msg: T => // wait until queue attached and pass forward all msgs to queue and the proxy actor 
    } 
} 

def proxyFlow[T](proxy: ActorRef): Flow[T, ActorRef] = { 
    val sink = Sink.actorSubscriber(Props(new Subscriber[T](proxy))) 
    val source = Source.queue[T](...) 
    Flow.fromSinkAndSourceMat(sink, source){ (ref, queue) => 
    ref ! Attach(queue) 
    ref 
    } 
} 

然後可以組裝客戶端流程如下:

val proxy = actorOf(...) 
val requestFlow = proxyFlow[Request](proxy) 
val responseFlow = proxyFlow[Response](proxy) 
val finalFlow: Flow[Request, Response] = 
    requestFlow.via(webSocketFlow).via(responseFlow) 
2

首先你需要一些akka進口:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.WebSocketRequest 
import akka.http.scaladsl.model.ws.Message 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.HttpResponse 
import akka.stream.scaladsl.Flow 
import akka.http.scaladsl.server.Directives.{ extractUpgradeToWebSocket, complete } 

這是創建一個WebSocket代理,端口800.0.0.0結合,代理方式來ws://echo.websocket.org一個例子App

object WebSocketProxy extends App { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    private[this] def manipulateFlow: Flow[Message, Message, akka.NotUsed] = ??? 

    private[this] def webSocketFlow = 
    Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) 

    private[this] val route: Flow[HttpRequest, HttpResponse, Any] = 
    extractUpgradeToWebSocket { upgrade => 
     val webSocketFlowProxy = manipulateFlow via webSocketFlow 
     val handleWebSocketProxy = upgrade.handleMessages(webSocketFlowProxy) 
     complete(handleWebSocketProxy) 
    } 

    private[this] val proxyBindingFuture = 
    Http().bindAndHandle(route, "0.0.0.0", 80) 

    println(s"Server online\nPress RETURN to stop...") 
    Console.readLine() 
} 

你必須去適應它爲play和爲貴應用結構。

注:

  • 記得要解除綁定proxyBindingFuture並終止在生產中system;
  • 只有當您想要操作消息時,您才需要manipulateFlow