如何從Akka HTTP路由發送元素/消息到Akka接收器?我的HTTP路由仍然需要返回一個正常的HTTP響應。akka-http:從http路由發送元素到akka接收器
我想這需要一個流分支/結。正常的HTTP路由是來自HttpRequest - > HttpResponse的流。我想添加一個分支/聯結,以便HttpRequests可以觸發事件到我單獨的接收器以及生成正常的HttpResponse。
下面是一個非常簡單的單路akka-http應用程序。爲了簡單起見,我使用了一個簡單的println接收器。我的生產用例顯然會涉及一個不太平凡的下沉。
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val route: akka.http.scaladsl.server.Route =
path("hello"/Segment) { p =>
get {
// I'd like to send a message to an Akka Sink as well as return an HTTP response.
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
編輯:或者在使用低級別阿卡-HTTP API,我怎麼能發送特定消息到沉從特定路由處理?
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)
// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
val serverSource = Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
// this is equivalent to
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
您的建議添加將整個HttpRequest發送到接收器,我想只是一個特定的路由發送特定的消息到接收器。 – clay
gotcha,我添加了更多信息 –
謝謝!你能告訴我怎麼用低級別的API來做到這一點嗎? – clay