2017-07-26 44 views
1

我有RunnableGraph像下面。當broadcastmerge階段之間有簡單的map時,一切都很好。但是,當涉及到mapConcat時,此代碼在使用第一個元素後不起作用。Akka流mapConcat不能與循環運行RunnableGraph

我想知道爲什麼它不起作用。

RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
import GraphDSL.Implicits._ 

val M = b.add(MergePreferred[Int](1)) 
val B = b.add(Broadcast[Int](2)) 
val S = Source(List(3)) 

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore 
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B 
ClosedShape 
}) 

// run() output: 
// 3 
// List(2,2,2) 

回答

0

mapConcat階段阻止反饋循環,這是預期的。考慮下面的事件鏈:

  1. mapConcat函數打印List(2,2,2)
  2. mapConcat階段需要需求以發射所述第一3個可用元素(2,2,2)
  3. 需求具有來從合併階段,因此從廣播階段。
  4. 廣播級反壓,如果它的任何下游反壓。它的下游是Sink.ignore(從不背壓)和mapConcat本身。
  5. mapConcat背壓如果「仍然存在來自先前計算的集合的其餘元素」,則根據docs。事實確實如此。

換句話說,你的週期是不平衡的。您在反饋循環中引入的元素多於您要刪除的元素。

此問題在this documentation page中有詳細說明,其中還介紹了一些解決方案。對於您的具體情況,由於您有過濾器階段,引入大於的緩衝區將打印所有元素。但是請注意,圖表只會掛起,之後不會完成。

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore 
M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B 
+0

感謝您的回答,這真的很有幫助!在你解釋之後,我發現'conflateWithSeed'更適合我的情況,因爲我不想在溢出時丟失消息。 –