我最後寫一個GraphStage
:
class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] {
val in: Inlet[A] = Inlet("in")
val out: Outlet[Seq[A]] = Outlet("out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter: Option[B] = None
private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A]
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in)
counter.fold({
counter = Some(f(element))
aggregate += element
pull(in)
}) { p =>
if (f(element) == p) {
aggregate += element
pull(in)
} else {
push(out, aggregate)
counter = Some(f(element))
aggregate = scala.collection.mutable.ArrayBuffer(element)
}
}
}
override def onUpstreamFinish(): Unit = {
emit(out, aggregate)
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}