我有一些奇怪的行爲,每當我嘗試使用Akka的Streams測試工具包時,都會看到這種行爲。使用Akka Streams測試工具包時發生零星超時失敗
我有以下自定義定義:
trait PauseFilter[T] {
def shouldPause(message: T): Boolean
}
trait MessagePauser[T] {
def pause(message: T): Unit
}
trait MessageUnPauser[T] {
def unPause: Option[T]
}
object PausableFlow {
def pausableFlow[T](filter: PauseFilter[T], pauser: MessagePauser[T], unpauser: MessageUnPauser[T]): Flow[T, T, _] =
Flow.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val initialMessagesIn = builder.add(Flow[T])
val finalMessagesOut = builder.add(Flow[T])
val unPauserIterator =() => Iterator.continually(unpauser.unPause)
val unpausedMessages = Source.fromIterator(unPauserIterator).collect { case Some(m) => m }
val pausedMessagesSink = Sink.foreach(pauser.pause)
val determineIfPaused = builder.add(Partition[T](2, message => if (filter.shouldPause(message)) 1 else 0))
val merge = builder.add(Merge[T](2))
unpausedMessages ~> merge
initialMessagesIn ~> merge ~> determineIfPaused
determineIfPaused.out(1) ~> pausedMessagesSink
determineIfPaused.out(0) ~> finalMessagesOut
FlowShape(initialMessagesIn.in, finalMessagesOut.out)
}
)
}
除了我已在流動的規範定義了以下完整性檢查測試:
test("a pauser that pauses half the messages should propagate only half the messages and pause the other half") {
val filter = new PauseFilter[Int] {
override def shouldPause(m: Int): Boolean = if (m % 2 == 0) true else false
}
val unpauser = mock[MessageUnPauser[Int]]
Mockito.when(unpauser.unPause).thenReturn(None)
val pauser = mock[MessagePauser[Int]]
val (pub, sub) = getTestHandles(PausableFlow.pausableFlow[Int](filter, pauser, unpauser))
sub.request(200)
1 to 100 foreach pub.sendNext
1 to 100 filter (m => m % 2 == 1) foreach sub.expectNext
1 to 100 filter (m => m % 2 == 0) foreach Mockito.verify(pauser, Mockito.times(1)).pause
}
private def getTestHandles[T](flow: Flow[T, T, _]) =
TestSource.probe[T]
.via(flow)
.toMat(TestSink.probe[T])(Keep.both)
.run()
正在發生的事情是,我得到零星的故障,但有以下例外情況:
assertion failed: timeout (3 seconds) during expectMsg:
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:405)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:716)
at akka.stream.testkit.TestPublisher$Probe.sendNext(StreamTestKit.scala:173)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply$mcV$sp(PausableFlowSpec.scala:70)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at com.company.stream.pausable.PausableFlowSpec.org$scalatest$BeforeAndAfterAll$$super$run(PausableFlowSpec.scala:16)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at com.company.stream.pausable.PausableFlowSpec.run(PausableFlowSpec.scala:16)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.run(Runner.scala:883)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
我已經離開了一些代碼出來,只是爲了給你一個參考點,所以你可以將例外與實際的代碼聯繫起來。當調用1 to 100 foreach pub.sendNext
時拋出異常。我很困惑,爲什麼會發生這種情況,因爲沒有任何操作導致這種行爲非常耗時。也許我在這裏錯過了一些東西。很高興能得到您的意見。