2016-04-27 84 views
0

需要建議,我需要運行並行多個源圖,例如,我創建了這個示例代碼,我創建了10個圖並將它們並行運行。Akka流 - 並行運行圖

這是正確的方法,還是應該在圖中創建多個源並在一個圖中並行運行它們?

def createGraph(start: Int, end: Int, name: String) = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
    implicit builder => 
     import GraphDSL.Implicits._ 
     val s = Source(start to end) 
     val f = Flow[Int].map[String](x => x.toString) 
     val sink = Sink.foreach[String](x => println(name + ":" + x)) 

     val t = builder.add(s) 

     val flow1 = builder.add(f) 

     t ~> flow1 ~> sink 

     ClosedShape 
    }) 
} 


(1 to 10).map(x => createGraph(x, x + 10, "g" + x)).map(_.run()) 

感謝 阿倫

+0

爲什麼所有的代碼做等價的:源(開始到結束).MAP(_的toString。).runForeach(X =>的println(S「$名: $ x「)) –

+0

我拿出了一些複雜的流程處理從主要代碼維護知識產權。問題陳述是如果我有多個源信息並需要爲每個源運行圖,那麼最佳實踐是什麼。例如,我們可以考慮多個來源可能會考慮閱讀多個卡夫卡主題,轉換,處理和沉浸數據庫。 – ASe

+0

這個方法對我來說有點奇怪,但我不能完全指出原因。我會分開時間把事情做成自己的舞臺,然後使用合併和/或平衡將它們組合成一張圖。然後運行一個圖表,只需一次。現在你正在創建n個「島嶼」。 – akauppi

回答

0

我試圖使用並行http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-parallelism.html,這個不錯,我的來源是不同的,但流動和匯是same.Each源是模擬在下面的例子中,認爲他們喜歡你從某些外部源讀取的數據流:

object TestParallelGraph extends App { 

    implicit val system = ActorSystem("test") 
    implicit val dispacher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val listOfDifferentSource=List(1,2,3) //consider we have to read data from various sources 


def createGraph() = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
     implicit builder => 
     import GraphDSL.Implicits._ 

     val merge=builder.add(Merge[Int](listOfDifferentSource.length)) 

     val flow=builder.add(Flow[Int].map(_ + 10)) //just random flow to add 10 

     //again as mentioned above creating source with different information to simulate 
     Source(listOfDifferentSource.head*100 to 100* listOfDifferentSource.head+10) ~> merge ~> flow ~> Sink.foreach(println) 

     for{ 
      i <- listOfDifferentSource.tail //for each other source 
     }yield (Source(100*i to 100*i+10) ~> merge) 

     ClosedShape 
    }) 
    } 

    createGraph().run() 
} 
+0

請注意,這不是並行運行,而是依次運行,您需要使用'.async'標記異步邊界,如鏈接到的文檔中所述。 – johanandren

+0

謝謝約翰。我使用的是kaka-stream-and-http-experimental 2.0.3,所以我認爲mapAsync應該注意這一點。 – ASe

+0

我的代碼將成爲val flow = builder.add(Flow [Int] .mapAsync(parallelism = 10)(_ + 10)) – ASe