2017-08-10 60 views
1

我有一些奇怪的行爲,每當我嘗試使用Akka的Streams測試工具包時,都會看到這種行爲。使用Akka Streams測試工具包時發生零星超時失敗

我有以下自定義定義:

trait PauseFilter[T] { 

    def shouldPause(message: T): Boolean 
} 

trait MessagePauser[T] { 

    def pause(message: T): Unit 
} 

trait MessageUnPauser[T] { 

    def unPause: Option[T] 
} 

object PausableFlow { 

    def pausableFlow[T](filter: PauseFilter[T], pauser: MessagePauser[T], unpauser: MessageUnPauser[T]): Flow[T, T, _] = 

    Flow.fromGraph(
     GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
     import GraphDSL.Implicits._ 

     val initialMessagesIn = builder.add(Flow[T]) 
     val finalMessagesOut = builder.add(Flow[T]) 
     val unPauserIterator =() => Iterator.continually(unpauser.unPause) 
     val unpausedMessages = Source.fromIterator(unPauserIterator).collect { case Some(m) => m } 
     val pausedMessagesSink = Sink.foreach(pauser.pause) 
     val determineIfPaused = builder.add(Partition[T](2, message => if (filter.shouldPause(message)) 1 else 0)) 
     val merge = builder.add(Merge[T](2)) 

     unpausedMessages ~> merge 
     initialMessagesIn ~> merge ~> determineIfPaused 
               determineIfPaused.out(1) ~> pausedMessagesSink 
               determineIfPaused.out(0) ~> finalMessagesOut 

     FlowShape(initialMessagesIn.in, finalMessagesOut.out) 
     } 
    ) 
} 

除了我已在流動的規範定義了以下完整性檢查測試:

test("a pauser that pauses half the messages should propagate only half the messages and pause the other half") { 
    val filter = new PauseFilter[Int] { 
    override def shouldPause(m: Int): Boolean = if (m % 2 == 0) true else false 
    } 

    val unpauser = mock[MessageUnPauser[Int]] 
    Mockito.when(unpauser.unPause).thenReturn(None) 

    val pauser = mock[MessagePauser[Int]] 

    val (pub, sub) = getTestHandles(PausableFlow.pausableFlow[Int](filter, pauser, unpauser)) 

    sub.request(200) 
    1 to 100 foreach pub.sendNext 
    1 to 100 filter (m => m % 2 == 1) foreach sub.expectNext 
    1 to 100 filter (m => m % 2 == 0) foreach Mockito.verify(pauser, Mockito.times(1)).pause 
} 

private def getTestHandles[T](flow: Flow[T, T, _]) = 
    TestSource.probe[T] 
    .via(flow) 
    .toMat(TestSink.probe[T])(Keep.both) 
    .run() 

正在發生的事情是,我得到零星的故障,但有以下例外情況:

assertion failed: timeout (3 seconds) during expectMsg: 
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: 
    at scala.Predef$.assert(Predef.scala:170) 
    at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:405) 
    at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814) 
    at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:716) 
    at akka.stream.testkit.TestPublisher$Probe.sendNext(StreamTestKit.scala:173) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70) 
    at scala.collection.immutable.Range.foreach(Range.scala:160) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply$mcV$sp(PausableFlowSpec.scala:70) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57) 
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) 
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) 
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
    at org.scalatest.Transformer.apply(Transformer.scala:22) 
    at org.scalatest.Transformer.apply(Transformer.scala:20) 
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) 
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122) 
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) 
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) 
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) 
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) 
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) 
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) 
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) 
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) 
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) 
    at scala.collection.immutable.List.foreach(List.scala:392) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) 
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) 
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) 
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) 
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) 
    at org.scalatest.Suite$class.run(Suite.scala:1424) 
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) 
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) 
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545) 
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) 
    at com.company.stream.pausable.PausableFlowSpec.org$scalatest$BeforeAndAfterAll$$super$run(PausableFlowSpec.scala:16) 
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) 
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) 
    at com.company.stream.pausable.PausableFlowSpec.run(PausableFlowSpec.scala:16) 
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) 
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) 
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) 
    at scala.collection.immutable.List.foreach(List.scala:392) 
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) 
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) 
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) 
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) 
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) 
    at org.scalatest.tools.Runner$.run(Runner.scala:883) 
    at org.scalatest.tools.Runner.run(Runner.scala) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) 

我已經離開了一些代碼出來,只是爲了給你一個參考點,所以你可以將例外與實際的代碼聯繫起來。當調用1 to 100 foreach pub.sendNext時拋出異常。我很困惑,爲什麼會發生這種情況,因爲沒有任何操作導致這種行爲非常耗時。也許我在這裏錯過了一些東西。很高興能得到您的意見。

回答

0

問題在於unPausedMessages - 這會殺死(性能明智)的流。考慮馴服這些電話: val unpausedMessages = Source.tick(FiniteDuration(0, "ms"), FiniteDuration(100, "ms"), unpauser.unPause) .collect { case Some(m) => m }或同等學歷。

發生的其他事情是,您嘗試將多個項目推入流中而不消耗。 考慮在Future內運行1 to 100 foreach pub.sendNext,並行執行消費者。

相關問題