我想實現一個定製的Akka接收器,但是我找不到處理它內部未來的方法。如何處理定製的阿卡水槽內的未來?
class EventSink(...) {
val in: Inlet[EventEnvelope2] = Inlet("EventSink")
override val shape: SinkShape[EventEnvelope2] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// This requests one element at the Sink startup.
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
Await.ready(future, Duration.Inf)
/*
future.onComplete {
case Success(_) =>
logger.info("pulling next events")
pull(in)
case Failure(failure) =>
logger.error(failure.getMessage, failure)
throw failure
}*/
pull(in)
}
})
}
}
private def handle(envelope: EventEnvelope2): Future[Unit] = {
val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope
...
db.run(statements.transactionally)
}
}
我必須去阻止未來,這看起來不好。我評論過的非封鎖內容僅適用於第一項活動。任何人都可以請幫忙?
更新感謝@ViktorKlang。它現在似乎在工作。
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new GraphStageLogic(shape) {
val callback = getAsyncCallback[Try[Unit]] {
case Success(_) =>
//completeStage()
pull(in)
case Failure(error) =>
failStage(error)
}
// This requests one element at the Sink startup.
override def preStart(): Unit = {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
future.onComplete { result =>
callback.invoke(result)
}
}
})
}
}
我想實現一個Rational DB事件接收器連接到ReadJournal.eventsByTag。所以這是一個連續的流,除非有錯誤,否則它將永不終止 - 這就是我想要的。我的方法是否正確?
兩個問題:
請問GraphStage永遠不會結束,除非我手動調用completeStage或failStage?
我是否正確或正常地聲明preStart方法之外的回調?在這種情況下,我是否有權在preStart中調用pull(in)?
感謝, 程
http://doc.akka.io/docs/akka/current/scala/流/流自定義。html#使用異步端通道 –
感謝@ViktorKlang,我已經閱讀過。沒有找到有用的東西。 – Cheng
爲什麼我鏈接的細分市場沒有用?你試過了嗎? –