2017-04-24 63 views
4

我想要一個Source以給定的時間間隔評估一個函數併發出它的輸出。作爲一種解決方法,我可以用Source.queue + offer來做到這一點,但還沒有找到一個更清晰的方法來做到這一點。理想情況下,我會有類似Akka Stream,來自函數?

def myFunction() = ....      // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick 

任何想法?

回答

8

可能是最清晰的方式是使用map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction()) 
1

我猜,throttle是你所需要的。完全Source可運行例如施加到可迭代,其使用功能在next()

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.ThrottleMode.Shaping 
import akka.stream.scaladsl.Source 

import scala.concurrent.duration._ 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 
var i = 0 

def myFunction(): Int = { 
    i = i + 1 
    i 
} 

import scala.collection.immutable.Iterable 

val x: Iterable[Int] = new Iterable[Int] { 
    override def iterator: Iterator[Int] = 
    new Iterator[Int] { 
     override def hasNext: Boolean = true 

     override def next(): Int = myFunction() 
    } 
} 
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println) 

throttle參數:節氣門源用1個元件每1秒鐘最大突發= 1,用暫停發射消息,以滿足節氣門速率(即前什麼是Shaping)。