2017-02-22 55 views
1

我在conf文件中有以下條目。但我不知道這是否調度程序設置被拾起,什麼是正在使用如何檢查akka應用程序中配置了哪個調度程序

 akka{ 
     actor{ 
     default-dispatcher { 
       type = Dispatcher 
       executor = "fork-join-executor" 
       throughput = 3 
       fork-join-executor { 
       parallelism-min = 40 
       parallelism-factor = 10 
       parallelism-max = 100 
       } 
      } 
     } 
    } 

終極並行數值我已經8核的機器,所以我希望並行線程是處於就緒狀態 40分鐘(8×10因子)< 100max。我想看看akka使用最大並行線程的價值。 我創建了45個子actor,在我的日誌中,我正在打印線程ID [application-akka.actor.default-dispatcher-xx],並且看不到超過20個線程並行運行。

回答

0

假設您有一個ActorSystem實例,您可以檢查在其配置中設置的值。這是你可以如何得到你在配置文件中設置的值:

val system = ActorSystem() 
val config = system.settings.config.getConfig("akka.actor.default-dispatcher") 
config.getString("type") 
config.getString("executor") 
config.getString("throughput") 
config.getInt("fork-join-executor.parallelism-min") 
config.getInt("fork-join-executor.parallelism-max") 
config.getDouble("fork-join-executor.parallelism-factor") 

我希望這有助於。您也可以參考this頁面瞭解有關特定配置設置的更多詳細信息。

更新

我已經挖了一點在阿卡找出到底是什麼,它使用了您的設置。正如你可能已經預計它使用ForkJoinPool。用於構建它的並行計算公式如下:

object ThreadPoolConfig { 
    ... 
    def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = 
math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling) 
    ... 
} 

該功能用於在某個點建立一個ForkJoinExecutorServiceFactory

new ForkJoinExecutorServiceFactory(
    validate(tf), 
    ThreadPoolConfig.scaledPoolSize(
    config.getInt("parallelism-min"), 
    config.getDouble("parallelism-factor"), 
    config.getInt("parallelism-max")), 
    asyncMode) 

無論如何,這是將要使用的並行創建ForkJoinPool,這實際上是java.lang.ForkJoinPool的一個實例。現在我們必須問這個池有多少個線程?簡單的答案是,只有在需要時纔會使用整個容量(在我們的案例中爲80個)。

爲了說明這種情況,我已經在actor中使用了Thread.sleep的各種用法,進行了一些測試。我發現的是,它可以在大約10個線程(如果沒有進行睡眠呼叫)的某處使用到最多80個線程(如果我叫睡1秒)。測試是在一臺8芯機器上進行的。

綜上所述,您需要檢查Akka使用的實現,以確切瞭解如何使用這種並行性,這就是我查看ForkJoinPool的原因。除了看配置文件,然後檢查特定的實現,我不認爲你可以做不幸:(

我希望這個澄清答案 - 最初我以爲你想看看如何配置演員系統的調度。

+0

是的,這會給出由akka系統從配置讀取的值。不過,我很想知道是否有辦法知道akka調度員最終從3個字段中選擇的並行度 - min,factor和max。 – Sree

+0

我在運行一些測試後更新了響應。另外,@Stefano Bonetti是對的 - 爲了最大限度地發揮所有演員需要同時工作的並行性:) –

0

爲了最大化並行性因子,所有參與者都需要同時處理一些消息。您確定這是您的應用程序中的情況嗎?

就拿下面的代碼

object Test extends App { 

    val system = ActorSystem() 

    (1 to 80).foreach{ _ => 
    val ref = system.actorOf(Props[Sleeper]) 
    ref ! "hello" 
    } 
} 

class Sleeper extends Actor { 
    override def receive: Receive = { 
    case msg => 
     //Thread.sleep(60000) 
     println(msg) 
    } 
} 

如果你認爲你的配置和8個內核,你會看到正在產生的線程少量(4,5?)作爲消息的處理過快速建立一些真正的並行性。

相反,如果你把你的演員CPU忙取消對討厭Thread.sleep,你會看到線程數會一下子提高到80。然而,這隻會持續1分鐘,在此之後,線程將被逐漸被從游泳池退休。

我想主要的訣竅是:不要想到每個演員都在一個單獨的線程上運行。每當一個或多個消息出現在調度員醒來的參與者郵箱上,並且確實將消息處理任務分派到指定的池中。

+0

我有相當類似的代碼。我在for循環中產生了45名演員。根據我的日誌,在兒童演員中完成計算大約需要1到1.5秒。我希望能有一個線程派發給每個孩子演員並行。我想這不會發生。任何方式來執行它? – Sree

+0

如果沒有足夠的任務,則無法強制執行45個正在生成的線程。機會是,在你的情況下沒有。沒有看到你的代碼,很難說。 –

相關問題