0
我試圖用一個簡單的循環創建一個Akka Stream。在閱讀文檔here並且沒有運氣的情況下,我嘗試將示例代碼複製爲起始基礎,但這也不起作用。代碼編譯(在包含示例中缺少的源代碼之後),但沒有打印出來。它看起來好像有些東西一直反壓,但我不明白爲什麼。Akka Streams文檔中的循環示例不起作用
這裏是我的代碼,任何幫助將是非常讚賞:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl._
import akka.stream.ClosedShape
object Simulate {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
// Define simulation flowgraph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import b._
import GraphDSL.Implicits._
val source = add(Source.repeat[Int](1))
val zip = add(ZipWith[Int, Int, Int]((left, right) => left))
val bcast = add(Broadcast[Int](2))
val concat = add(Concat[Int]())
val start = add(Source.single[Int](0))
val sink = add(Sink.ignore)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> sink
concat <~ bcast
zip.in1 <~ concat <~ start
ClosedShape
})
g.run()
}
}
謝謝,我試過了,但不幸的是它不起作用。我得到了同樣的東西,沒有任何東西流過圖表。 – Oli
這很奇怪。我確實得到了它的工作。讓我粘貼答案中的所有代碼。 – manub
@Oli我已經更新了答案。似乎在將'Broadcast'的出口流入'Concat'之前,爲'zip'定義第二個入口是有用的。我不確定這是什麼確切的原因。 – manub