這是不是很清楚你想從響應中得到什麼。 stream()
返回Future
,因此如果你在flatMap
上,你還需要從處理程序返回未來。你想在那裏返回什麼?例如,如果你想獲得Future[String]
與響應的整個身體,你可以使用runReduce(_ + _)
:
val result: Future[String] = ws.url(url).stream()
.flatMap(response => response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.runReduce(_ + _)
)
runReduce(f: (U, U) => U)
返回Future[U]
,那就是,在你的情況下,將Future[String]
。如果你想通過一些其他的功能分別處理輸入流的每個元素,你可以使用runForeach
:
ws.url(url).stream()
.flatMap(response => response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.runForeach(s => handleString(s))
)
沒有你想要做的更多的細節,這是很難提供一個更具體的答案。
更新:如果要限制從外部服務器來的郵件,你可以使用內置的throttle
組合子:
val result: Future[Source[String, _]] = ws.url(url)
.stream()
.map { response =>
response.body
.via(framing)
.map(_.utf8String)
.map(_ + "\n")
.throttle(10, 1.second, 10, ThrottleMode.Shaping)
}
這裏,result
未來將包含流String
s,實現時,每秒鐘最多可產生10個元素,必要時進行反壓。您可以在我上面鏈接的文檔中找到更多信息。
謝謝 - 輸出是JSON字符串。我想要得到這個字符串,並用scala來表示它並更新我的案例類。 –
@SakarSR,我還是不明白,對不起。 「短語」是什麼意思?考慮編寫一個你想在沒有反應流的情況下做什麼的例子。 –
用戶名:@Vladimir url web服務器連續不斷地發送json數據。我想捕獲數據並對數據進行一些計算並將其傳遞給前端。我也想減慢我的網絡服務器(反壓)。計劃使用Akka流。我希望它清楚。 –