1
我有RunnableGraph
像下面。當broadcast
和merge
階段之間有簡單的map
時,一切都很好。但是,當涉及到mapConcat
時,此代碼在使用第一個元素後不起作用。Akka流mapConcat不能與循環運行RunnableGraph
我想知道爲什麼它不起作用。
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
ClosedShape
})
// run() output:
// 3
// List(2,2,2)
感謝您的回答,這真的很有幫助!在你解釋之後,我發現'conflateWithSeed'更適合我的情況,因爲我不想在溢出時丟失消息。 –