2017-02-14 75 views
1

我是Akka和Scala的完整初學者。作爲我的第一個項目之一,我決定做一個合併排序的實現,而不是遞歸,我實例化新的參與者來完成拆分和合並。看起來我的系統到達了合併排序樹的葉子,甚至發生了一些合併,然而它停止了,我得到一個AskTimeoutException。我在另一個與Ask有關的項目中遇到了類似的問題。有人能指引我朝着正確的方向嗎?Akka Ask在合併排序實現中保持超時

ParentMerger得到實現:

def receive = { 
    case ParentMerger.Begin => { 
     implicit var timeout = Timeout(60.seconds) 
     println("Parent sending off first halves") 
     // Assumption: at the beginning the array size is 2 or greater 
     var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2)) 
     var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length)) 

     arrayFuture1.onComplete { 
     case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
      arrayFuture2.onComplete { 
      case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
       print(merge(arr1, arr2).toString()) 
       println("Final merge done") 
      } 
      } 
     } 
     } 
    } 
    } 

合併得到實現:

def receive = { 
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length == 1 => { 
     println("Child received array of size 1") 
     sender() ! Merger.Reply(array) 
    } 
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length >= 2 => { 
     println("Child received an array of size >= 2") 
     for(i <- 0 to 1) { 
     mergers(i) = context.actorOf(Props[Merger]) 
     } 

     implicit var timeout = Timeout(60.seconds) 
     var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2)) 
     var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length)) 

     arrayFuture1.onComplete { 
     case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
      arrayFuture2.onComplete { 
      case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
       println("Child merge") 
       sender() ! Merger.Reply(merge(arr1, arr2)) 
      } 
      } 
     } 
     } 
    } 
    } 

,我得到的輸出:

Parent sending off first halves 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received array of size 1 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child received array of size 1 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child merge 
Child merge 
Child received array of size 1 
Child merge 
Child merge 
Child received array of size 1 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$b/$b#-1137516511] to Actor[akka://Main/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$a#2073409209] to Actor[akka://Main/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$a#-1459967586] to Actor[akka://Main/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$b/$b#-2142577608] to Actor[akka://Main/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
scala.MatchError: Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://Main/user/app/$a#520493765]] after [60000 ms]. Sender[Actor[akka://Main/user/app#1966408365]] sent message of type "ParentMerger$SendHalf".) (of class scala.util.Failure) 
    at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1(ParentMerger.scala:67) 
    at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1$adapted(ParentMerger.scala:66) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) 
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 

回答

1

你要回復的錯誤演員:

sender() ! Merger.Reply(merge(arr1, arr2)) 

從響應內部調用到未來時可能無效。在onComplete區塊外部捕獲sender

val originalSender = sender() 
    arrayFuture1.onComplete { 
    case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
     arrayFuture2.onComplete { 
     case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
      println("Child merge") 
      originalSender ! Merger.Reply(merge(arr1, arr2)) 
     } 
     } 
    } 
    } 
+0

它修復了它。謝謝!所以,如果我把sender()放在onComplete塊中,它會引用Merger.Reply的發件人嗎? –

+0

這對我來說並不明顯,它究竟是什麼。它可能是*你的*演員(即'self'),但是雙嵌套的'onComplete'模塊讓我覺得它可能是一些子系統演員,你甚至不能訪問 我已經瞭解到,它通常是最好的避免使用'ask'模式:這種模式非常容易受到*顯然無辜的*重構,從而導致密碼(和無聲)失敗的代碼。 –