2017-08-05 59 views
0

我想實現一個定製的Akka接收器,但是我找不到處理它內部未來的方法。如何處理定製的阿卡水槽內的未來?

class EventSink(...) { 

    val in: Inlet[EventEnvelope2] = Inlet("EventSink") 
    override val shape: SinkShape[EventEnvelope2] = SinkShape(in) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = pull(in) 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      Await.ready(future, Duration.Inf) 
      /* 
      future.onComplete { 
      case Success(_) => 
       logger.info("pulling next events") 
       pull(in) 
      case Failure(failure) => 
       logger.error(failure.getMessage, failure) 
       throw failure 
      }*/ 
      pull(in) 
     } 
     }) 
    } 
    } 

    private def handle(envelope: EventEnvelope2): Future[Unit] = { 
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope 
    ... 
    db.run(statements.transactionally) 
    } 
} 

我必須去阻止未來,這看起來不好。我評論過的非封鎖內容僅適用於第一項活動。任何人都可以請幫忙?


更新感謝@ViktorKlang。它現在似乎在工作。

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
{ 
    new GraphStageLogic(shape) { 
     val callback = getAsyncCallback[Try[Unit]] { 
     case Success(_) => 
      //completeStage() 
      pull(in) 
     case Failure(error) => 
      failStage(error) 
     } 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = { 
     pull(in) 
     } 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      future.onComplete { result => 
      callback.invoke(result) 
      } 
     } 
     }) 
    } 
    } 

我想實現一個Rational DB事件接收器連接到ReadJournal.eventsByTag。所以這是一個連續的流,除非有錯誤,否則它將永不終止 - 這就是我想要的。我的方法是否正確?

兩個問題:

  1. 請問GraphStage永遠不會結束,除非我手動調用completeStage或failStage?

  2. 我是否正確或正常地聲明preStart方法之外的回調?在這種情況下,我是否有權在preStart中調用pull(in)?

感謝, 程

+0

http://doc.akka.io/docs/akka/current/scala/流/流自定義。html#使用異步端通道 –

+0

感謝@ViktorKlang,我已經閱讀過。沒有找到有用的東西。 – Cheng

+0

爲什麼我鏈接的細分市場沒有用?你試過了嗎? –

回答

0

免俗階段

在一般情況下,你應該儘量吸盡與圖書館的SourceFlowSink的給出的方法所有的可能性。自定義階段幾乎不需要,並且使代碼難以維護。

編寫你的「自定義」階段使用標準方法

基於對你的問題的示例代碼我看不出有任何理由,你爲什麼會使用自定義Sink開始與細節。

鑑於你的handle方法,你可以稍微修改它這樣做,你在問題中指定的日誌記錄:

val loggedHandle : (EventEnvelope2) => Future[Unit] = 
    handle(_) transform { 
    case Success(_)  => { 
     logger.info("pulling next events") 
     Success(Unit) 
    } 
    case Failure(failure) => { 
     logger.error(failure.getMessage, failure) 
     Failure(failure) 
    } 
    } 

那麼就使用Sink.foreachParallel處理信封:

val createEventEnvelope2Sink : (Int) => Sink[EventEnvelope2, Future[Done]] = 
    (parallelism) => 
    Sink[EventEnvelope2].foreachParallel(parallelism)(handle _) 

現在,即使您希望將每個EventEnvelope2發送到db,以便您只需使用1進行並行:

val inOrderDBInsertSink : Sink[EventEnvelope2, Future[Done]] = 
    createEventEnvelope2Sink(1) 

而且,如果數據庫拋出一個異常,你仍然可以得到它保持在foreachParallel完成:

val someEnvelopeSource : Source[EventEnvelope2, _] = ??? 

someEnvelopeSource 
    .to(createEventEnvelope2Sink(1)) 
    .run() 
    .andThen { 
    case Failure(throwable) => { /*deal with db exception*/ } 
    case Success(_)   => { /*all inserts succeeded*/ } 
    }