2016-05-13 63 views
2

我在Akka Stream中瞭解到,一個插座必須連接到一個插座,並且沒有自動支持將多個接收器連接到相同的信號源。所以你必須插入中間對象,如BroadcastAkka Stream:在圖形構建器階段動態增長插座

我正在將一個信號處理DAG轉換爲一個Akka Stream圖,如果我可以在通過遍歷發現它們時動態地添加接收器,它將會對我有很大的幫助。如果我有自定義GraphStage,我可以在Graph.create階段有我自己的Shapeoutlets系列動態增長嗎?正常的DSL操作~>由這個調用支持:

b.addEdge(importAndGetPort(b), to) 

如何生成器「獲得」 Outlet在這裏,我將能夠在其上生長的需求我的形狀?


如果這不起作用,是有可能「退出」之前播出,斷開其邊緣及連接線路與圖形建設過程中一個新的更大的廣播?

+0

我不知道潛在問題的答案,但是這種設計有一個很大的缺點:信息源只會像最慢的目的地一樣快地傳播值。當所有接收器處理完一條消息時,廣播只會表示更多的需求。你的問題似乎更適合直接使用Actor。然後,您可以使用路由動態添加到Actor的目的地。 http://doc.akka.io/docs/akka/2.4.2/scala/routing.html –

+0

@RamonJRomeroyVigil是的,源只能處理一次所有的接收器準備就緒。這是故意的,因爲源節點可能執行昂貴且不可重複的計算。如果需要,我可以在源和匯之間插入緩衝區。 –

回答

1

GraphDSL不允許動態改變你的形狀。

但是,由於Akka 2.4.10可以使用BroadcastHub(和MergeHub)。

BroadcastHub可以爲您提供一個接收器,可以實現爲一個源。 這個來源可以按照需要多次實現,以動態附加多個訂閱者。

因此,對於你DAG的節點(例如,具有入度= 1和出度= 3),你可以有像

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run() 

val nodeSink1 = hubSource.to(outEdgeSink1).run() 
val nodeSink2 = hubSource.to(outEdgeSink2).run() 
val nodeSink3 = hubSource.to(outEdgeSink3).run() 

阿卡文檔:

http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub