我已經開始使用Akka Streams和Op-Rabbit,並且有點困惑。Akka Streams scala DSL和Op-Rabbit
我需要根據謂詞拆分流,然後將它們組合起來,就像我在創建圖形和使用分區和合並時完成的一樣。
我已經能夠做這樣的事情使用GraphDSL.Builder,但似乎無法得到它與AckedSource /流程/水槽工作
圖形看起來像:
| --> flow1 --> |
source--> partition --> | | --> flow3 --> sink
| --> flow2 --> |
我不確定是否應該使用,因爲我總是需要2個流量。
這是沒有做分區和不使用GraphDSL.Builder一個樣本:
def splitExample(source: AckedSource[String, SubscriptionRef],
queueName: String)
(implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
.map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow1 processing $s")
tup
})
val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow2 processing $s")
tup
})
source
.map(Message.queue(_, queueName))
.via(AckedFlow(toStringFlow))
// partition if string.length < 10
.via(AckedFlow(printFlow1))
.via(AckedFlow(printFlow2))
.to(AckedSink.ack)
}
這是我似乎無法得到的代碼工作:
import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
import GraphDSL.Implicits._
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
// source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
ClosedShape
}}
編譯器不能解析~>
操作
所以我的問題是:
是否有使用Scala的DSL構建ACKED /來源/流程/接收器的圖形示例項目?
是否有一個分區和合並的示例項目,與我在此嘗試的類似?
請注意,爲了獲得'〜>'和其他奇特操作符,您需要在DSL構建函數的主體內導入'GraphDSL.Implicits._'。 –
謝謝,我已經更新了代碼以進行導入,但是它仍然不能解析'〜>'運算符 –