2016-07-07 72 views
2

隨着阿卡流的早期版本,groupBy返回SourceSource A S可能被物化到一個Source[Seq[A]]如何創建阿卡流源[序列[A]從源代碼[A]

隨着Akka Streams 2.4我看到groupBy返回SubFlow - 我不清楚如何使用這個。我需要適用於流程的轉換必須具有整個Seq可用,所以我不能只在SubFlow(我認爲)上map

我寫過一個類extends GraphStage,它通過GraphStageLogic中的可變集合進行聚合,但是有沒有內置的功能?我是否錯過了SubFlow

回答

0

我最後寫一個GraphStage

class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] { 
    val in: Inlet[A] = Inlet("in") 
    val out: Outlet[Seq[A]] = Outlet("out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     private var counter: Option[B] = None 
     private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A] 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val element = grab(in) 

      counter.fold({ 
      counter = Some(f(element)) 
      aggregate += element 
      pull(in) 
      }) { p => 
      if (f(element) == p) { 
       aggregate += element 
       pull(in) 
      } else { 
       push(out, aggregate) 
       counter = Some(f(element)) 
       aggregate = scala.collection.mutable.ArrayBuffer(element) 
      } 
      } 
     } 
     override def onUpstreamFinish(): Unit = { 
      emit(out, aggregate) 
      complete(out) 
     } 
     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
}