2016-09-21 109 views
1

我試圖使用流,而不是純粹的演員來處理HTTP請求和我用下面的代碼來:如何綁定akka http與akka流?

trait ImagesRoute { 

    val log = LoggerFactory.getLogger(this.getClass) 

    implicit def actorRefFactory: ActorRefFactory 
    implicit def materializer: ActorMaterializer 

    val source = 
    Source 
     .actorRef[Image](Int.MaxValue, OverflowStrategy.fail) 
     .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
     .toMat(Sink.asPublisher(true))(Keep.both) 

    val route = { 
    pathPrefix("images") { 
     pathEnd { 
     post { 
      entity(as[Image]) { image => 

      val (ref, publisher) = source.run() 

      val addFuture = Source.fromPublisher(publisher) 

      val future = addFuture.runWith(Sink.head[Option[Image]]) 

      ref ! image 

      onComplete(future.mapTo[Option[Image]]) { 
       case Success(img) => 
       complete(Created, img) 

       case Failure(e) => 
       log.error("Error adding image resource", e) 
       complete(InternalServerError, e.getMessage) 
      } 
      } 
     } 
     } 
    } 
    } 
} 

我不知道這是,或者即使做了正確的方法這是一個好方法,或者如果我應該使用演員與路線進行交互,使用問題模式,然後在演員內部進行流式傳輸。

任何想法?

+0

如果我沒有記錯的話,你的情況你根本不需要流。據我所知,'ImageRepository.add'方法返回一個'Future';所有你需要做的就是編寫'onComplete(ImageRepository.add(image))',就這些了。 –

+0

@VladimirMatveev是的沒錯,這只是一個簡單的例子,但流管道應該做更多的事情,比如聯繫外部資源並最終回壓東西...... –

回答

4

如果你只從實體預計1幅圖像,那麼你不需要創建從一個ActorRef和Source你不需要Sink.asPublisher,你可以簡單地使用Source.single

def imageToComplete(img : Option[Image]) : StandardRoute = 
    img.map(i => complete(Created, i)) 
    .getOrElse { 
     log error ("Error adding image resource", e) 
     complete(InternalServerError, e.getMessage 
    } 

... 

entity(as[Image]) { image => 

    val future : Future[StandardRoute] = 
    Source.single(image) 
      .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
      .runWith(Sink.head[Option[Image]]) 
      .map(imageToComplete) 

    onComplete(future) 
} 

簡化你的代碼進一步,事實上,你只能處理1個圖像意味着,俱樂部是不必要的,因爲沒有必要爲背壓只有1個元素:

val future : Future[StandardRoute] = ImageRepository.add(image) 
                .map(imageToComplete) 

onComplete(future) 

在評論你表示

「這只是一個簡單的例子,但流管道應該是 做大做很多事情想聯繫的外部資源和 最終回壓的東西」

這將僅適用於您的實體是圖像流。如果您只處理每個HttpRequest 1個圖像,則背壓永不適用,並且您創建的任何流都將爲slower version of a Future

如果實體其實是在圖像流,那麼你可以使用它作爲流的一部分:

val byteStrToImage : Flow[ByteString, Image, _] = ??? 

val imageToByteStr : Flow[Image, Source[ByteString], _] = ??? 

def imageOptToSource(img : Option[Image]) : Source[Image,_] = 
    Source fromIterator img.toIterator 

val route = path("images") { 
    post { 
    extractRequestEntity { reqEntity => 

     val stream = reqEntity.via(byteStrToImage) 
          .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
          .via(Flow.flatMapConcat(imageOptToSource)) 
          .via(Flow.flatMapConcat(imageToByteStr)) 

     complete(HttpResponse(status=Created,entity = stream)) 
    } 
    } 
}  
+0

謝謝,這讓我很有意義 –

+1

@ ThiagoPereira歡迎您,愉快的黑客入侵。 –