1
我是Akka和Scala的完整初學者。作爲我的第一個項目之一,我決定做一個合併排序的實現,而不是遞歸,我實例化新的參與者來完成拆分和合並。看起來我的系統到達了合併排序樹的葉子,甚至發生了一些合併,然而它停止了,我得到一個AskTimeoutException。我在另一個與Ask有關的項目中遇到了類似的問題。有人能指引我朝着正確的方向嗎?Akka Ask在合併排序實現中保持超時
ParentMerger得到實現:
def receive = {
case ParentMerger.Begin => {
implicit var timeout = Timeout(60.seconds)
println("Parent sending off first halves")
// Assumption: at the beginning the array size is 2 or greater
var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2))
var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length))
arrayFuture1.onComplete {
case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => {
arrayFuture2.onComplete {
case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => {
print(merge(arr1, arr2).toString())
println("Final merge done")
}
}
}
}
}
}
合併得到實現:
def receive = {
case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length == 1 => {
println("Child received array of size 1")
sender() ! Merger.Reply(array)
}
case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length >= 2 => {
println("Child received an array of size >= 2")
for(i <- 0 to 1) {
mergers(i) = context.actorOf(Props[Merger])
}
implicit var timeout = Timeout(60.seconds)
var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2))
var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length))
arrayFuture1.onComplete {
case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => {
arrayFuture2.onComplete {
case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => {
println("Child merge")
sender() ! Merger.Reply(merge(arr1, arr2))
}
}
}
}
}
}
,我得到的輸出:
Parent sending off first halves
Child received an array of size >= 2
Child received an array of size >= 2
Child received an array of size >= 2
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child received an array of size >= 2
Child received an array of size >= 2
Child received array of size 1
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child received array of size 1
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child merge
Child merge
Child received array of size 1
Child merge
Child merge
Child received array of size 1
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$b/$b#-1137516511] to Actor[akka://Main/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$a#2073409209] to Actor[akka://Main/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$a#-1459967586] to Actor[akka://Main/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$b/$b#-2142577608] to Actor[akka://Main/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
scala.MatchError: Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://Main/user/app/$a#520493765]] after [60000 ms]. Sender[Actor[akka://Main/user/app#1966408365]] sent message of type "ParentMerger$SendHalf".) (of class scala.util.Failure)
at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1(ParentMerger.scala:67)
at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1$adapted(ParentMerger.scala:66)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
它修復了它。謝謝!所以,如果我把sender()放在onComplete塊中,它會引用Merger.Reply的發件人嗎? –
這對我來說並不明顯,它究竟是什麼。它可能是*你的*演員(即'self'),但是雙嵌套的'onComplete'模塊讓我覺得它可能是一些子系統演員,你甚至不能訪問 我已經瞭解到,它通常是最好的避免使用'ask'模式:這種模式非常容易受到*顯然無辜的*重構,從而導致密碼(和無聲)失敗的代碼。 –