2017-11-11 155 views
0

閱讀akka流的文檔,我不太清楚諸如消息順序之類的東西,以及我是否可以執行它。讓我用我爲聊天服務器編寫的一小段代碼來設置我的問題的上下文。使用akka流時的事件順序

def flowShape(user: User) = GraphDSL 
    .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) { 
    implicit builder => 
     implicit chatSource => 

     import GraphDSL.Implicits._ 

     val messageFromOutside = builder.add(Flow[String].map { 
     case msg: String => UserTextMessage(user, msg) 
     case _ => InvalidMessage 
     }) 

     val merge = builder.add(Merge[ChatMessage](2)) 
     // UPDATE --> this is where the change comes 
     // val merge = builder.add(Concat[ChatMessage](2)) 

     // val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user)) 
     val channelActorSink = Sink.actorRef(channelActor, UserLeft(user)) 

     val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) } 

     actorAsSource ~> merge.in(0) 
     messageFromOutside.out ~> merge.in(1) 
     merge ~> channelActorSink 

     FlowShape(messageFromOutside.in, chatSource.out) 
} 

讓事情簡單我自己,我用這個流形,一個非常簡單的源和宿。像這樣的東西 -

val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice")) 
val sink = Sink.foreach[ChatMessage] { 
    case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}") 
    case ul: UserLeft => println(s"${ul.user.username} just left the channel") 
    case uj: UserJoined => println(s"${uj.user.username} just joined the channel") 
    case _ => println(s"do not know what I just received") 
} 

val mychatchannel = new Channel(420, myactorsystem) 

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink) 

現在,這裏來了我的關注。打印在終端中的事件順序根本不好。我不知道如何解決它。這是我得到的輸出 -

[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 

輸出中缺少第一條消息hi。在打印UserJoin message之前似乎已經發送了hi消息。

我試着通過使用actorRefWithAck(我在上面的代碼中註釋過)修復它(並且還添加了一些消息傳遞的安全性)。它給出了類似的輸出。

[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events 
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message 

顯然似乎是發生的是,源發送郵件發送UserJoin消息之前。我怎樣才能解決這個問題?從概念上講,我認爲我希望UserJoin message在資源實現時,但在實際發送第一條消息之前立即發送。那可能嗎?

謝謝

回答

0

把溪流想成水管:當有水時,它會流動。合併運算符不關心來自哪個元素。如果你想訂購這些輸入,你需要通過使用Concat來告訴Akka。

+0

非常感謝。這幫助我並最終解決了這個問題。 – shashydhar