2017-12-27 292 views
1

目的是從數據庫流數據,在這個組塊數據的執行一些計算(該計算返回一些情況下類的未來)發送分塊響應,併發送該數據作爲分塊的響應給用戶。目前,我能夠流式傳輸數據併發送響應,而無需執行任何計算。但是,我無法執行此計算,然後傳輸結果。轉化油滑流數據,並使用阿卡的Http

這是我實施的路線。

def streamingDB1 = 
path("streaming-db1") { 
    get { 
    val src = Source.fromPublisher(db.stream(getRds)) 
    complete(src) 
    } 
} 

函數getRds返回映射到案例類(使用光滑)的表的行。現在考慮功能計算這需要每一行作爲輸入並返回另一個案例類的未來。喜歡的東西

def compute(x: Tweet) : Future[TweetNew] = ? 

我如何能實現可變SRC此功能,並將其發送給計算用戶的分塊響應(如流)。

回答

1

你可以使用變換的mapAsync來源:

val src = 
    Source.fromPublisher(db.stream(getRds)) 
     .mapAsync(parallelism = 3)(compute) 

complete(src) 

調整並行的水平需要。

+0

這是行不通的。我運行curl命令打端點。然而,連接被關閉。 – user3294786