2016-04-22 63 views
2

爲什麼線數繼續增加?Akka的線程數量不斷增加。什麼可能是錯的?

查看此圖片的右下角

Thread count keeps increasing

整個流程是這樣的:

Akka HTTP Server API 
    -> on http request, sendMessageTo DataProcessingActor 
     -> sendMessageTo StorageActor 
      -> sendMessageTo DataBaseActor 
      -> sendMessageTo IndexActor 

這是阿卡HTTP API的定義(在僞代碼):

Main { 
    path("input/") { 
    post { 
     dataProcessingActor forward message 
    } 
    } 
} 

以下是在角色定義(在僞代碼中):

DataProcessingActor { 
    case message => 
    message = parse message 
    storageActor ! message 
} 


StorageActor { 
    case message => 
    indexActor ! message 
    databaseActor ! message 
} 


DataBaseActor { 
    case message => 
    val c = get monogCollection 
    c.store(message) 
} 

IndexActor { 
    case message => 
    elasticSearch.index(message) 
} 

後,我運行此安裝程序,並在發送多個HTTP requsts爲 「輸入/」 HTTP終結,我得到的錯誤:

for(i <- 0 until 1000000) { 
    post("input/", someMessage+i) 
} 

錯誤:

[ERROR] [04/22/2016 13:20:54.016] [Main-akka.actor.default-dispatcher-15] [akka.tcp://[email protected]:2558/system/IO-TCP/selectors/$a/0] Accept error: could not accept new connection 
java.io.IOException: Too many open files 
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) 
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) 
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) 
    at akka.io.TcpListener.acceptAllPending(TcpListener.scala:107) 
    at akka.io.TcpListener$$anonfun$bound$1.applyOrElse(TcpListener.scala:82) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) 
    at akka.io.TcpListener.aroundReceive(TcpListener.scala:32) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

編輯1

這裏是使用的application.conf文件:

akka { 
    loglevel = "INFO" 
    stdout-loglevel = "INFO" 
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 

    actor { 
    default-dispatcher { 
     throughput = 10 
    } 
    } 

    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 

    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "127.0.0.1" 
     port = 2558 
    } 
    } 
} 
+0

可能是蒙戈驅動 – Haspemulator

+0

只有正在使用一個蒙戈數據庫連接,並且它是在一個階對象'對象DB {lazy val db = mongodb.connect()}'。基本上,在一個演員內部沒有新的連接。新連接僅在首次初始化DB對象時生成,並引用'db'。它認爲它不能是MongoDB連接。我錯過了什麼嗎? – tuxdna

+0

我會附加一個調試器/分析器,看看那些成千上萬的線程正在做什麼。 – Haspemulator

回答

0

我發現ElasticSearch是個問題。我正在使用Java API for ElasticSearch,並且因爲它從Java API使用的方式而泄露套接字。現在按照此處所述解決。

下面是使用Java API

trait ESClient { def getClient(): Client } 

case class ElasticSearchService() extends ESClient { 
    def getClient(): Client = { 
    val client = new TransportClient().addTransportAddress(
     new InetSocketTransportAddress(Config.ES_HOST, Config.ES_PORT) 
    ) 
    client 
    } 
} 

這是這是造成泄漏的演員彈性搜索客戶服務:

class IndexerActor() extends Actor { 

    val elasticSearchSvc = new ElasticSearchService() 
    lazy val client = elasticSearchSvc.getClient() 

    override def preStart = { 
    // initialize index, and mappings etc. 
    } 

    def receive() = { 
    case message => 
     // do indexing here 
     indexMessage(ES.client, message) 
    } 
} 

注意:每次創建一個主角實例,正在建立新的連接。

每次調用new ElasticSearchService()都會創建一個到ElasticSearch的新連接。我動議到一個單獨的對象,如下所示,也演員使用該對象,而不是:

object ES { 
    val elasticSearchSvc = new ElasticSearchService() 
    lazy val client = elasticSearchSvc.getClient() 
} 


class IndexerActor() extends Actor { 

    override def preStart = { 
    // initialize index, and mappings etc. 
    } 

    def receive() = { 
    case message => 
     // do indexing here 
     indexMessage(ES.client, message) 
    } 
} 
相關問題