2017-03-02 90 views
0

阿卡流我有一個流是並行運行

  1. 偵聽HTTP後接收事件的
  2. 列表mapconcat事件的流元素
  3. 轉換事件卡夫卡記錄列表
  4. 產生與反應性卡夫卡(akka流卡夫卡生產者水槽)的記錄

這裏是簡化代碼

// flow to split group of lines into lines 
    val splitLines = Flow[List[Evt]].mapConcat(list=>list) 

// sink to produce kafka records in kafka 
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt] 
    .map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value)) 
    .toMat(Producer.plainSink(kafka))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     (entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) => 
      val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat) 
      val result = onComplete(ingest){ 
       case Success(value) => complete(s"OK") 
       case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
      } 
      complete("eventList ingested: " + result) 
      } 
     } 
    } 
    } 

您能否突出顯示什麼是並行運行,什麼是順序?

我認爲mapConcat序列化流中的事件,所以如何在並行處理mapConcat每個步驟之後並行化流?

簡單的mapAsyncUnordered是否足夠?或者我應該使用GraphDSL進行平衡和合並?

回答

2

在你的情況下,它會順序我想。在開始向Kafka推送數據之前,您也會收到整個請求。我會用extractDataBytes指令給你src: Source[ByteString, Any]。然後我會處理它像

src 
    .via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String)) 
    .mapConcat { line => 
    line.split(",") 
    }.async 
    .runWith(kafkaSink)(mat) 
+0

extractDataBytes的問題是,我不能解開JSON easly ... – vgkowski

+0

嗯,我明白了。我從來不需要解析json的無限流,但我聽說jawn支持它。 https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert

+0

也可以查看http://owlike.github.io/genson/ – expert