2017-02-24 66 views
2

如何從Akka HTTP路由發送元素/消息到Akka接收器?我的HTTP路由仍然需要返回一個正常的HTTP響應。akka-http:從http路由發送元素到akka接收器

我想這需要一個流分支/結。正常的HTTP路由是來自HttpRequest - > HttpResponse的流。我想添加一個分支/聯結,以便HttpRequests可以觸發事件到我單獨的接收器以及生成正常的HttpResponse。

下面是一個非常簡單的單路akka-http應用程序。爲了簡單起見,我使用了一個簡單的println接收器。我的生產用例顯然會涉及一個不太平凡的下沉。

def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    // I would like to send elements to this sink in response to HTTP GET operations. 
    val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

    val route: akka.http.scaladsl.server.Route = 
    path("hello"/Segment) { p => 
     get { 
     // I'd like to send a message to an Akka Sink as well as return an HTTP response. 
     complete { 
      s"<h1>Say hello to akka-http. p=$p</h1>" 
     } 
     } 
    } 

    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem) 
    val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080) 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    bindingFuture 
    .flatMap(_.unbind())(executor) // trigger unbinding from the port 
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done 
} 

編輯:或者在使用低級別阿卡-HTTP API,我怎麼能發送特定消息到沉從特定路由處理?

def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    // I would like to send elements to this sink in response to HTTP GET operations. 
    val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

    val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(GET, Uri.Path("/"), _, _, _) => 
     HttpResponse(entity = HttpEntity(
     ContentTypes.`text/html(UTF-8)`, 
     "<html><body>Hello world!</body></html>")) 

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => 
     HttpResponse(entity = "PONG!") 

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => 
     sys.error("BOOM!") 

    case r: HttpRequest => 
     r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
     HttpResponse(404, entity = "Unknown resource!") 
    } 

    val serverSource = Http().bind(interface = "localhost", port = 8080) 

    val bindingFuture: Future[Http.ServerBinding] = 
    serverSource.to(Sink.foreach { connection => 
     println("Accepted new connection from " + connection.remoteAddress) 

     connection handleWithSyncHandler requestHandler 
     // this is equivalent to 
     // connection handleWith { Flow[HttpRequest] map requestHandler } 
    }).run() 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    bindingFuture 
    .flatMap(_.unbind())(executor) // trigger unbinding from the port 
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done 
} 

回答

2

如果你想整個HttpRequest發送給你的一個水槽,我想說的最簡單的方法是使用alsoTo組合子。其結果將是沿着

​​

僅供參考線的東西:alsoTo其實隱藏着Broadcast階段。

IF相反,您需要選擇性地將消息從特定的子路由發送到接收器,您別無選擇,只能爲每個傳入請求實現新的流。見下面的例子

val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

val route: akka.http.scaladsl.server.Route = 
    path("hello"/Segment) { p => 
    get { 

     (extract(_.request) & extractMaterializer) { (req, mat) ⇒ 
     Source.single(req).runWith(sink)(mat) 

     complete { 
      s"<h1>Say hello to akka-http. p=$p</h1>" 
     } 
     } 
    } 
    } 

而且,記住,你總是可以完全拋棄高層DSL,並使用lower-level streams DSL模型,你整條路線。這將導致更詳細的代碼 - 但會給你完全控制你的流實現。

編輯:例如低於

val sink: Sink[Any, Future[Done]] = Sink.foreach(println) 

val handlerFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val partition = b.add(Partition[HttpRequest](2, { 
     case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0 
     case _          ⇒ 1 
    })) 
    val merge = b.add(Merge[HttpResponse](2)) 

    val happyPath = Flow[HttpRequest].map{ req ⇒ 
     HttpResponse(entity = HttpEntity(
     ContentTypes.`text/html(UTF-8)`, 
     "<html><body>Hello world!</body></html>")) 
    }   

    val unhappyPath = Flow[HttpRequest].map{ 
     case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => 
     HttpResponse(entity = "PONG!") 

     case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => 
     sys.error("BOOM!") 

     case r: HttpRequest => 
     r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
     HttpResponse(404, entity = "Unknown resource!") 
    } 

    partition.out(0).alsoTo(sink) ~> happyPath ~> merge 
    partition.out(1)    ~> unhappyPath ~> merge 

    FlowShape(partition.in, merge.out) 
    }) 

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080) 
+0

您的建議添加將整個HttpRequest發送到接收器,我想只是一個特定的路由發送特定的消息到接收器。 – clay

+0

gotcha,我添加了更多信息 –

+0

謝謝!你能告訴我怎麼用低級別的API來做到這一點嗎? – clay

0

這是我用似乎理想的解決方案。阿卡Http似乎是這樣設計的,以便您的路由很簡單HttpRequest-> HttpResponse流並且不涉及任何額外的分支。

與其將所有內容都構建到單個Akka流圖中,我有一個單獨的QueueSource-> Sink圖,正常的Akka Http HttpRequest-> HttpResponse流只是根據需要向源隊列添加元素。

object HttpWithSinkTest { 
    def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = { 
    val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s") 

    val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew) 
    val sink: Sink[String, Future[Done]] = Sink.foreach(println) 
    val annotatedSink = annotateMessage.toMat(sink)(Keep.right) 
    val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both) 

    queueGraph 
    } 

    def buildHttpFlow(queue: SourceQueueWithComplete[String], 
        actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = { 
    implicit val actorSystemI = actorSystem 
    implicit val materializerI = materializer 

    val route: akka.http.scaladsl.server.Route = 
     path("hello"/Segment) { p => 
     get { 
      complete { 
      queue.offer(s"got http event p=$p") 

      s"<h1>Say hello to akka-http. p=$p</h1>" 
      } 
     } 
     } 

    val routeFlow = RouteResult.route2HandlerFlow(route) 

    routeFlow 
    } 

    def main(args: Array[String]): Unit = { 
    val actorSystem = ActorSystem("my-akka-http-test") 
    val executor = actorSystem.dispatcher 
    implicit val materializer = ActorMaterializer()(actorSystem) 

    val (queue, _) = buildQueueSourceGraph().run()(materializer) 

    val httpFlow = buildHttpFlow(queue, actorSystem, materializer) 
    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem) 
    val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080) 

    println("Server online at http://localhost:8080/") 
    println("Press RETURN to stop...") 
    scala.io.StdIn.readLine() 

    println("Shutting down...") 

    val serverBinding = Await.result(bindingFuture, Duration.Inf) 
    Await.result(serverBinding.unbind(), Duration.Inf) 
    Await.result(actorSystem.terminate(), Duration.Inf) 

    println("Done. Exiting") 
    } 
}