2016-07-14 26 views
0

假設我有一個工作者actor接收消息,進行一些處理並返回結果。並予有需要被轉換成結果的序列消息的序列:如果我運行上面只用「消息1」如何使用提問模式編譯結果序列

object Test { 

    case class Message(str: String) 
    case class Result(str: String) 

    class Worker extends Actor { 
    def receive = { 
     case Message(data) => 
      println("Sleeping: " + data) 
      Thread.sleep(10000) 
      val result = Result(data + " - result") 
      println("Sending result: " + result) 
      sender ! result 
    } 
    } 

    def test(messages: Seq[Message]): Future[Seq[Result]] = { 
    val worker = ActorSystem().actorOf(Props(new Worker)) 
    val results = messages.map { m => 
     implicit val timeout = Timeout(20 seconds) 
     println("Sending: " + m) 
     val result = worker ? m 
     result.asInstanceOf[Future[Result]] 
    } 
    Future.sequence(results) 
    } 

    def main(args: Array[String]): Unit = { 
    val messages: Seq[Message] = args.map(Message(_)) 
    test(messages).foreach { r => 
     println("Result: " + r) 
    } 
} 

}

作爲參數運行良好給予的輸出低於:

發送:消息(消息1)

睡眠:消息1

硒nding結果:結果(消息1 - 結果)

結果:ArraySeq(結果(消息1 - 結果))

但是說我做它: 「消息1」,「消息2 「‘消息3’,那麼最後的消息最終被髮送到deadLetters:

發送:消息(消息1)發送:消息(消息2)睡眠: 消息1發送:消息(消息-3)

發送結果:結果(消息1 - 結果)

睡眠:消息2

發送結果:結果(消息-2 - 結果)

睡眠:消息3

發送結果:結果(消息3 - 結果)

[INFO] [07/15/2016 09:07:49.832] [default-akka.actor.default-dispatcher-2] [akka:// default/deadLetters] Message [util.Tester $ Result] from 演員[akka:// default/user/$ a#1776546850]到 演員[akka:// default/deadLetters]未送達。 [1]遇到死亡字母 。此日誌記錄可以關閉或通過 配置設置'akka.log-dead-letters'和 'akka.log-dead-letters-during-shutdown'進行調整。

我猜這是因爲我的調用線程在發送最後一條消息時超出了範圍。如何正確地將所有結果收集到一個序列中?

注意改變自己的測試方法,下面給出了相同的結果:

def test(messages: Seq[Message]): Future[Seq[Result]] = { 
    val worker = ActorSystem().actorOf(Props(new Worker)) 
    Future.traverse(messages) { m => 
     implicit val timeout = Timeout(20 seconds) 
     println("Sending: " + m) 
     val result = worker ? m 
     result.asInstanceOf[Future[Result]] 
    } 
} 
+0

變量「actor」不存在,你的意思是'worker'在這裏嗎?不知道你爲什麼看到死信,除非你的工作人員沒有正確響應,就像可能使用'sender()'不正確。此外,您可能想要查看「Future.sequence」以將所有結果作爲單個「Future」來獲得。但我無法確定這些問題是否能解決您的問題。 – acjay

回答

0

似乎是因爲我的超時設置得太低。應該足夠大以覆蓋所有的工作 - 例如40秒。

0

裝聾作啞的答案是:

Future.traverse(messages)(m => actor ? m).map(_.asInstanceOf[Result]) 

但它可能會更好一次全部發送數據:

class Worker extends Actor { 
    def receive = { 
    case Message(data) => 
     // Convert data into result 
     ... 
     sender ! result 
    case seq: Seq[Message] => 
     ... 
     sender ! results 

    } 
} 
+3

你也可以使用'Future.traverse(list)(m => actor?m ...)'。 「遍歷」類似於「映射」和「序列」的組合。 –

+0

@PeterNeyens確實,隨時編輯或自己回答:) – ipoteka