2016-02-29 67 views
2

我使用圖dsl來創建一些基於我看到的示例代碼的流處理作業。一切都運行很好,我只是無法理解的符號:(更新2.4)Akka Streams圖DSL符號

def elements: Source[Foos] = ... 
def logEveryNSink = // a sink that logs 
def cleaner: Flow[Foos, Bars, Unit] = ... 

def boolChecker(bar: Bar)(implicit ex: ExecutionContext): Future[Boolean] = ... 

val mySink = Sink.foreach[Boolean](println(_)) 

val lastly = Flow[Bars].mapAsync(2)(x => boolChecker(x).toMat(mySink)(Keep.right) 

val materialized = RunnableGraph.fromGraph(
GraphDSL.create(lastly) { implicit builder => 
    baz => { 
    import GraphDSL.Implicits._ 
    val broadcast1 = builder.add(Broadcast[Foos](2)) 
    val broadcast2 = builder.add(Broadcast[Bars](2)) 
    elements ~> broadcast1 ~> logEveryNSink(1) 
       broadcast1 ~> cleaner ~> broadcast2 ~> baz 
            ~> broadcast2 ~> logEveryNSink(1) 
    ClosedShape 
} 
} 
).run() 

據我所知,包括隱含的建設者,但是我不確定什麼baz代表{ implicit builder => baz => { ...。它只是整個形狀的一個隱含名稱?

回答

5

GraphDSL.create方法嚴重超載,以接受許多輸入形狀(包括0)的變體。如果你在沒有初始的形狀通過,然後buildBlock功能ARG(身體,你實際上定義圖形如何待建)的簽名如下:

(Builder[NotUsed]) => S 

所以這是一個簡單的Function1[Builder[NotUsed], S],即是一個函數,它需要一個Builder[NotUsed]的實例並返回一個Shape實例,它是最終圖。這裏的NotUsedUnit同義,因爲你所說的是不通過任何輸入份額,你不關心正在生成的輸出圖的物化值。

如果您決定通過輸入形狀,那麼buildBlock函數的簽名會稍微改變以適合輸入形狀。在你的情況,你逝去的1周輸入的形狀,所以buildBlock更改簽名:現在

(Builder[Mat]) => Graph.Shape => S 

,這基本上是一個Function1[Builder[Mat], Function1[Graph.Shape, S]],或者說需要一個Builder[Mat](其中Mat是物化的值類型的函數的輸入形狀)並返回一個函數,該函數採用Graph.Shape並返回S(它是Shape)的實例。長話短說,如果你通過形狀傳遞,那麼你還需要聲明它們作爲圖形積木功能的限制參數,但作爲第二個輸入函數(因此額外的=>)。

+0

!!感謝您的明確解釋 – Azeli

相關問題