2016-03-04 61 views
0

我試圖用一個簡單的循環創建一個Akka Stream。在閱讀文檔here並且沒有運氣的情況下,我嘗試將示例代碼複製爲起始基礎,但這也不起作用。代碼編譯(在包含示例中缺少的源代碼之後),但沒有打印出來。它看起來好像有些東西一直反壓,但我不明白爲什麼。Akka Streams文檔中的循環示例不起作用

這裏是我的代碼,任何幫助將是非常讚賞:

import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ActorMaterializerSettings} 
import akka.stream.scaladsl._ 
import akka.stream.ClosedShape 

object Simulate { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    def main(args: Array[String]): Unit = { 

    // Define simulation flowgraph 
    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
     import b._ 
     import GraphDSL.Implicits._ 

     val source = add(Source.repeat[Int](1)) 
     val zip  = add(ZipWith[Int, Int, Int]((left, right) => left)) 
     val bcast = add(Broadcast[Int](2)) 
     val concat = add(Concat[Int]()) 
     val start = add(Source.single[Int](0)) 
     val sink = add(Sink.ignore) 

     source ~> zip.in0 
       zip.out.map { s => println(s); s } ~> bcast ~> sink 
          concat  <~   bcast 
       zip.in1 <~ concat  <~   start 
     ClosedShape 
    }) 

    g.run() 

    } 
} 

回答

1

編輯:它實際上看來問題不加入一個緩衝區,但爲了入口/被宣佈網點。

這工作:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val source = Source.repeat(1) 
    val start = Source.single(0) 

    val zip = b.add(ZipWith((left: Int, right: Int) => left)) 
    val bcast = b.add(Broadcast[Int](2)) 
    val concat = b.add(Concat[Int]()) 

    source ~> zip.in0 
      zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore 
      zip.in1 <~ concat <~ start 
         concat <~ bcast 
    ClosedShape 
}) 

g.run() 

zip.in1 <~ concat <~ startconcat <~ bcast的順序與上Docs什麼是一致的。

+0

謝謝,我試過了,但不幸的是它不起作用。我得到了同樣的東西,沒有任何東西流過圖表。 – Oli

+0

這很奇怪。我確實得到了它的工作。讓我粘貼答案中的所有代碼。 – manub

+0

@Oli我已經更新了答案。似乎在將'Broadcast'的出口流入'Concat'之前,爲'zip'定義第二個入口是有用的。我不確定這是什麼確切的原因。 – manub