我想在其上創建Source
,後來推元素,如在:如何創建可以通過方法調用稍後接收元素的源?
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
什麼是推薦的方式做到這一點?
謝謝!
我想在其上創建Source
,後來推元素,如在:如何創建可以通過方法調用稍後接收元素的源?
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
什麼是推薦的方式做到這一點?
謝謝!
有三種方法可以做到這一點:
1.郵政物化與SourceQueue
您可以使用Source.queue
是物化的流入SourceQueue
:
case class Weather(zipCode : String, temperature : Double, raining : Boolean)
val bufferSize = 100
//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the queue Materialized value instead of the Sink's
queue offer Weather("02139", 32.0, true)
2 。演員實現物化
有一個類似的問題和回答here,要點是,你兌現了流作爲ActorRef和郵件發送到REF:與演員
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the ref Materialized value instead of the Sink's
ref ! Weather("02139", 32.0, true)
3.預物化同樣,您可以明確地創建一個包含消息緩衝區的Actor,使用該Actor創建一個Source,然後發送該Actor消息,如回答here:
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
Prematerialization with隊列:http://stackoverflow.com/a/36467193/591922 – Loic
@Loic我沒有從你的評論中得到「預先實現隊列」將是第四個可能的解決方案。它是。我發現這很好:http://stackoverflow.com/questions/37113877/how-can-i-use-and-return-source-queue-to-caller-without-materializing-it/37117205#37117205 – akauppi
@akauppi在你發佈的鏈接,如果你'mapMaterializedValue'它會創建另一個來源。他使用Future來獲得他想要返回的源的隊列。 –
這可能是一樣的:http://stackoverflow.com/questions/29072963/how-to-add-elements-to-source-dynamically/29077212#29077212 – cmbaxter
@cmbaxter的確。雖然我想通過向某個演員發送消息來提供流,但沒有爲演員實例化或有類。我相信這可以通過'Source.actorRef'功能來實現,正如我在鏈接和發佈的帖子中看到的那樣:http://stackoverflow.com/questions/30785011/accessing-the-underlying-actorref-of-an-akka -stream源創建的逐源的行爲。非常感謝) – ale64bit
您可能也會發現這個答案有幫助: https://stackoverflow.com/questions/40345697/how-to-use-akka-http-client-websocket-send-message/44605821#44605821 –