2017-08-04 124 views
0

我最近正在學習Akka Actor。我讀了演員中調度員的文件。我對演員中的阻止操作感到好奇。文檔中最後的topic描述瞭如何解決該問題。我試圖重現文檔中的示例實驗。演員中的阻止操作不佔用所有默認調度員

這裏是我的代碼:

package dispatcher 

import akka.actor.{ActorSystem, Props} 
import com.typesafe.config.ConfigFactory 

object Main extends App{ 

    var config = ConfigFactory.parseString(
    """ 
     |my-dispatcher{ 
     |type = Dispatcher 
     | 
     |executor = "fork-join-executor" 
     | 
     |fork-join-executor{ 
     |fixed-pool-size = 32 
     |} 
     |throughput = 1 
     |} 
    """.stripMargin) 

// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf")) 


    val system = ActorSystem("block") 


    val actor1 = system.actorOf(Props(new BlockingFutureActor())) 
    val actor2 = system.actorOf(Props(new PrintActor())) 

    for(i <- 1 to 1000){ 
    actor1 ! i 
    actor2 ! i 
    } 

} 

package dispatcher 

import akka.actor.Actor 

import scala.concurrent.{ExecutionContext, Future} 

class BlockingFutureActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     Thread.sleep(5000) 
     implicit val excutionContext: ExecutionContext = context.dispatcher 
     Future { 
     Thread.sleep(5000) 
     println(s"Blocking future finished ${i}") 
     } 
    } 
} 
package dispatcher 

import akka.actor.Actor 

class PrintActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     println(s"PrintActor: ${i}") 
    } 
} 

我只是創建默認的調度員的ActorSystem和所有演員取決於這些。 BlockingFutureActor具有封裝操作,封裝在Future中。 PrintActor僅僅是打印一個數字。

在文檔的說明中,BlockingFutureActor中的默認調度程序將被Future s佔用,導致PrintActor的消息阻塞。該應用程序卡住在某處:

> PrintActor: 44 
> PrintActor: 45 

不幸的是,我的代碼沒有被阻止。 PrintActor的所有輸出顯示順利。但是BlockingFutureActor的輸出顯示出像擠壓牙膏。我嘗試監視的IntelliJ的調試我的線程信息,我得到: thread monitoring

你會發現只有兩個調度員正在睡覺(BlockingFutureActor使這種情況發生)。其他人正在等待,這意味着他們可用於新消息傳遞。

我已閱讀關於在演員中阻止操作的回答(page)。有人引用說,「調度員實際上是線程池,分離兩個保證,即緩慢的阻塞操作不會使另一個餓死,這種方法通常被稱爲批量標題,因爲它的思想是如果應用程序的一部分失敗,其餘部分保持響應。「

默認調度程序是否將一些調度程序用於阻塞操作?這樣即使有很多阻塞操作要求調度員,系統也可以處理消息。

可以複製Akka文件中的實驗嗎?我的配置有問題嗎?

感謝您的建議。最好的祝願。

+0

「從'PrintActor'顯示一切順利產出。」你是說你從'PrintActor'看到所有1000條'println'語句嗎? – chunjef

+0

是的,確切地說。當應用程序啓動時,1000'println'出現。 – jiexray

回答

2

你看到從PrintActor所有1000條打印語句從BlockingFutureActor任何打印語句之前,究其原因是因爲在BlockingFutureActorreceive塊中的第一Thread.sleep通話。這Thread.sleep是官方文檔中的代碼和例子之間的主要區別:

override def receive: Receive = { 
    case i: Int => 
    Thread.sleep(5000) // <----- this call is not in the example in the official docs 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

記住,演員一次處理一個消息。 Thread.sleep(5000)基本上模擬了至少需要五秒鐘才能處理的消息。 BlockingFutureActor在完成處理當前消息之前不會處理另一條消息,即使它的郵箱中有數百條消息。雖然BlockingFutureActor正在處理第Int號值爲1的消息,但PrintActor已處理完所有發送給它的1000條消息。爲了使這更清楚,讓我們添加一個println聲明:

override def receive: Receive = { 
    case i: Int => 
    println(s"Entering BlockingFutureActor's receive: $i") // <----- 
    Thread.sleep(5000) 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

樣本輸出,當我們運行程序:

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
PrintActor: 3 
... 
PrintActor: 1000 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Blocking future finished 1 
... 

正如你所看到的,由BlockingFutureActor實際開始處理的時間消息2PrintActor已經攪動了所有1000條消息。

如果您刪除第一個Thread.sleep,那麼您將看到郵件從BlockingFutureActor的郵箱中更快地出列,因爲工作正在「委派」到Future。一旦創建Future,演員將從其郵箱中抓取下一封郵件,而不等待Future完成。下面是不首先Thread.sleep樣本輸出(它不會完全一樣,每次被你運行它):

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
... 
PrintActor: 84 
PrintActor: 85 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Entering BlockingFutureActor's receive: 4 
Entering BlockingFutureActor's receive: 5 
PrintActor: 86 
PrintActor: 87 
... 
+0

因爲我的粗心大意,我覺得非常抱歉犯了一個愚蠢的錯誤。 「Thread.sleep(5000)」應該被封裝在「Future」中。而我的第一個'Thread.sleep(5000)'是一個愚蠢的bug。我非常感謝您的耐心和幫助。最好的祝願。 – jiexray