2017-03-04 128 views
0

如何在下面的代碼返回concurrent.Future:如何處理流是來自WSClient在阿卡流數據2

val nextMeetup = ws.url(url).stream() 
     .flatMap(response => response.body 
      .via(framing) 
      .map(_.utf8String) 
      .map(_ + "\n") 
     ) 

類型不匹配錯誤: 發現:akka.stream.scaladsl.Source [字符串,_ $ 2] 要求:scala.concurrent.Future []

我的網址分裂JSON數據流 - 它不是一個Twitter流

請解釋來解決這個問題?

回答

0

這是不是很清楚你想從響應中得到什麼。 stream()返回Future,因此如果你在flatMap上,你還需要從處理程序返回未來。你想在那裏返回什麼?例如,如果你想獲得Future[String]與響應的整個身體,你可以使用runReduce(_ + _)

val result: Future[String] = ws.url(url).stream() 
    .flatMap(response => response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .runReduce(_ + _) 
    ) 

runReduce(f: (U, U) => U)返回Future[U],那就是,在你的情況下,將Future[String]。如果你想通過一些其他的功能分別處理輸入流的每個元素,你可以使用runForeach

ws.url(url).stream() 
    .flatMap(response => response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .runForeach(s => handleString(s)) 
    ) 

沒有你想要做的更多的細節,這是很難提供一個更具體的答案。


更新:如果要限制從外部服務器來的郵件,你可以使用內置的throttle組合子:

val result: Future[Source[String, _]] = ws.url(url) 
    .stream() 
    .map { response => 
    response.body 
     .via(framing) 
     .map(_.utf8String) 
     .map(_ + "\n") 
     .throttle(10, 1.second, 10, ThrottleMode.Shaping) 
    } 

這裏,result未來將包含流String s,實現時,每秒鐘最多可產生10個元素,必要時進行反壓。您可以在我上面鏈接的文檔中找到更多信息。

+0

謝謝 - 輸出是JSON字符串。我想要得到這個字符串,並用scala來表示它並更新我的案例類。 –

+0

@SakarSR,我還是不明白,對不起。 「短語」是什麼意思?考慮編寫一個你想在沒有反應流的情況下做什麼的例子。 –

+0

用戶名:@Vladimir url web服務器連續不斷地發送json數據。我想捕獲數據並對數據進行一些計算並將其傳遞給前端。我也想減慢我的網絡服務器(反壓)。計劃使用Akka流。我希望它清楚。 –