2017-05-28 113 views
1

在一個簡單的測試中,我期待流生成和打印數字一秒鐘。我想測試處理背壓的流操作,並且需要不支持背壓的Source流爲什麼立即停止,以及如何防止它

... with FreeSpec ... { 

    implicit val system = ActorSystem(this.getClass.getSimpleName) 
    private val matSettings: ActorMaterializerSettings = 
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true) 
    implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1)) 

"must print numbers for a second" in { 

    val source: Source[Double, ActorRef] = 
    Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble()) 

    val sink: Sink[Double, Future[Done]] = Sink.foreach(println) 

    val actorRef: ActorRef = Flow[Double].to(sink).runWith(source) 

    system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher) 

    Thread.sleep(1000) 
    println("done") 
} 

但是,演員似乎在流程實現後立即停止,沒有發送單個消息,只發送了兩個消息。我在哪裏誤解了這裏發生的事情,以及如何獲得預期的結果? 日誌:

08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] 
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage 
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started 
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - started ([email protected]f15e8) 
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels 
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] 
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter 
08:24:07.080 DEBUG akka.event.DeadLetterListener - started ([email protected]) 
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with [email protected] 
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started ([email protected]) 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates 
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates 
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192] 
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started ([email protected]) 
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off. 
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902] 
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started ([email protected]) 
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] 
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started ([email protected]) 
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped 
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped 
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
done 
+0

Thread.sleep代碼是阻塞線程,可能會阻止在演員系統的東西。如果你能夠將「未來」帶出並「等待」,那會更好。 – Stephen

回答

5

的問題在這裏與您Source.actorRef的類型。即使演員可以收到Any類型的消息,但當您將其包裝在Source中時,您需要給它一個類型(以完成Akka Streams強大的輸入)。

例子:

val source: Source[Int, ActorRef] = Source.actorRef[Int](100, OverflowStrategy.fail) 

什麼引擎蓋下的情況是,你的Source將嘗試每一個進來的消息轉換爲Int

在你的情況下,Source.actorRef沒有明確鍵入,因此Nothing由編譯器推斷。 (這是由你連接map階段的事實掩蓋的,其中一切都變成了Double)。所有您傳入的「打勾」消息都會被轉換爲Nothing,導致ClassCastException

的解決方法是鍵入Source.actorRef階段

val source: Source[Double, ActorRef] = 
     Source.actorRef[String](100, OverflowStrategy.fail).map(_ => Random.nextDouble()) 
+0

嘿,這是一個棘手的,現在的作品。非常感謝,Stefano! – kostja