2017-07-18 81 views
0

我已經開始使用Akka Streams和Op-Rabbit,並且有點困惑。Akka Streams scala DSL和Op-Rabbit

我需要根據謂詞拆分流,然後將它們組合起來,就像我在創建圖形和使用分區和合並時完成的一樣。

我已經能夠做這樣的事情使用GraphDSL.Builder,但似乎無法得到它與AckedSource /流程/水槽工作

圖形看起來像:

     | --> flow1 --> | 
source--> partition --> |    | --> flow3 --> sink 
         | --> flow2 --> | 

我不確定是否應該使用,因爲我總是需要2個流量。

這是沒有做分區和不使用GraphDSL.Builder一個樣本:

def splitExample(source: AckedSource[String, SubscriptionRef], 
       queueName: String) 
       (implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = { 
    val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]] 
    .map[AckTup[String]](tup => { 
     val (p,m) = tup 
     (p, new String(m.data)) 
    }) 

    val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]] 
    .map[AckTup[String]](tup => { 
     val (p, s) = tup 
     println(s"flow1 processing $s") 
     tup 
    }) 

    val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]] 
    .map[AckTup[String]](tup => { 
     val (p, s) = tup 
     println(s"flow2 processing $s") 
     tup 
    }) 

    source 
    .map(Message.queue(_, queueName)) 
    .via(AckedFlow(toStringFlow)) 
    // partition if string.length < 10 
    .via(AckedFlow(printFlow1)) 
    .via(AckedFlow(printFlow2)) 
    .to(AckedSink.ack) 
} 

這是我似乎無法得到的代碼工作:

import GraphDSL.Implicits._ 
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = { 
    import GraphDSL.Implicits._ 
    GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s => 
    import GraphDSL.Implicits._ 
    source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack 
//  source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack) 
    ClosedShape 

}}

編譯器不能解析~>操作

所以我的問題是:

  1. 是否有使用Scala的DSL構建ACKED /來源/流程/接收器的圖形示例項目?

  2. 是否有一個分區和合並的示例項目,與我在此嘗試的類似?

+0

請注意,爲了獲得'〜>'和其他奇特操作符,您需要在DSL構建函數的主體內導入'GraphDSL.Implicits._'。 –

+0

謝謝,我已經更新了代碼以進行導入,但是它仍然不能解析'〜>'運算符 –

回答

1

在處理acked-stream時,請牢記以下定義。

  1. AckedSource[Out, Mat]Source[AckTup[Out], Mat]]
  2. AckedFlow[In, Out, Mat]一個包裝爲Flow[AckTup[In], AckTup[Out], Mat]
  3. AckedSink[In, Mat]一個包裝爲Sink[AckTup[In], Mat]
  4. AckTup[T]一個包裝爲(Promise[Unit], T)
  5. 別名的經典流組合子將在操作T部分AckTup
  6. .acked組合子將完成Promise[Unit]AckedFlow

的GraphDSL邊緣運營商(~>)將開始處理一堆阿卡預定義的形狀(見GraphDSL.Implicits的代碼),但它不會對自定義形狀工作由acks-stream lib定義。

你有2種方式進行:

  1. 你定義自己的~>隱含的操作,沿着那些的線GraphDSL.Implicits
  2. 你解開的ACKED階段獲得標準的階段。您可以使用.wrappedRepr訪問包裹階段 - 可在AckedSource,AckedFlowAckedSink上獲得。
+0

這是我看起來無法工作的代碼: 'def buildModelAcked(source:AckedSource [String (隱式構造器):GraphDSL.Builder [Future [Done]] =>隱式構造器:GraphDSL.Builder [Future [Done]] =>隱式actorSystem:ActorSystem) (AckFlow(toStringFlow)〜> AkedFlow(toStringFlow)〜> AkedSink.ack source.map(Message.queue(_,queueName))。via(AckedFlow(toStringFlow) ))。(AckedSink.ack) ClosedShape }} works w/o dsl,但不能解決〜>運營商 –

+0

你能給我一個簡單的例子嗎? 謝謝 –

+0

非常好!那是失蹤的一塊。 謝謝 –

0

基於斯特凡諾 - 博內蒂出色的方向,這裏是一個可能的解決方案:

graph:  
         |--> short --| 
    rabbitMq --> before --|   |--> after 
         |--> long --| 

解決方案:

val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => { 
    val (p,m) = tup 
    (p, new String(m.data)) 
}) 

val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => { 
    val (p, s) = tup 
    println(s"short: $s") 
    tup 
}) 
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => { 
    val (p, s) = tup 
    println(s"long: $s") 
    tup 
}) 
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => { 
    val (p, s) = tup 
    println(s"all $s") 
    tup 
}) 

def buildSplitGraph(source: AckedSource[String, SubscriptionRef] 
        , queueName: String 
        , splitLength: Int)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = { 
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s => 
    val toShort = 0 
    val toLong = 1 

    // junctions 
    val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => { 
                  val (p, s) = tup 
                  if (s.length < splitLength) toShort else toLong 
                 } 
    )) 
    val merge = builder.add(Merge[AckTup[String]](2)) 

    //graph 
    val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr 
    beforeSplit ~> split 
    // must do short, then long since the split goes in that order 
    split ~> AckedFlow(short).wrappedRepr ~> merge 
    split ~> AckedFlow(long).wrappedRepr ~> merge 
    // after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue 
    merge ~> AckedFlow(after).acked ~> s 

    ClosedShape 
}} 

正如斯特凡諾 - 博內蒂說,關鍵是使用與相關聯的.wrappedReprAckedFlow,然後使用.acked組合器作爲最後一步。