0
阿卡流我有一個流是並行運行
- 偵聽HTTP後接收事件的
- 列表mapconcat事件的流元素
- 轉換事件卡夫卡記錄列表
- 產生與反應性卡夫卡(akka流卡夫卡生產者水槽)的記錄
這裏是簡化代碼
// flow to split group of lines into lines
val splitLines = Flow[List[Evt]].mapConcat(list=>list)
// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
.map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
.toMat(Producer.plainSink(kafka))(Keep.right)
val routes = {
path("ingest") {
post {
(entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
val result = onComplete(ingest){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
complete("eventList ingested: " + result)
}
}
}
}
您能否突出顯示什麼是並行運行,什麼是順序?
我認爲mapConcat序列化流中的事件,所以如何在並行處理mapConcat每個步驟之後並行化流?
簡單的mapAsyncUnordered是否足夠?或者我應該使用GraphDSL進行平衡和合並?
extractDataBytes的問題是,我不能解開JSON easly ... – vgkowski
嗯,我明白了。我從來不需要解析json的無限流,但我聽說jawn支持它。 https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert
也可以查看http://owlike.github.io/genson/ – expert