2017-05-04 70 views
1

我想創建一個使用遠程actor的示例Akka應用程序。目標是創建例如16個以順序方式交換消息的演員(演員16與演員15,15到14等會談,並且1與演員16會談)。但是,我在通信方面遇到了問題,因爲我連續出現此錯誤。使用Akka創建消息環的死信

[INFO] [2017年5月4日15:45:58.248] [ActorFlasks-akka.actor.default-調度-4] [阿卡:// ActorFlasks/deadLetters]消息[java.lang中。[String] from Actor [akka:// ActorFlasks/user/16#-2022012132] to Actor [akka:// ActorFlasks/deadLetters] was not delivery。 [1]遇到遇難信 。

爲此,我運行應用程序的16個終端實例,並始終使用不同的配置文件。我創建在每種情況下actorsystem像這樣:

object Main extends App { 

    val localId = args(0) 

    val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile 
    val config = ConfigFactory.parseFile(new File(configFile)) 
    val system = ActorSystem("ActorFlasks" , config) 
    val remote = system.actorOf(Props[CyclonManager], name=localId) 

    remote ! "START" 
} 

配置文件的一個例子是這樣的:

akka { 
    actor { 
    provider = remote 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 50001 
    } 
} 
} 

和演員的定義是這樣的:

class CyclonManager extends Actor { 

    def propagateMessage(): Unit = { 
    val localId = self.path.name.toInt 
    val currentPort = 50000 + localId 
    val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1 
    val nextHopId = localId-1 

    val nextHopRef = context.actorSelection(s"akka.tcp://[email protected]:$nextHopPort/user/$nextHopId") 

    nextHopRef ! "NEXT" 
    } 

    override def receive: Receive = { 
    case "START" => 
     if (self.path.name == "16") { 
     propagateMessage() 
     } 
    case "NEXT" => 
     propagateMessage() 
    case _ => 
     println("Unrecognized message") 
    } 
} 

這是一個讓我開始的簡單例子,但無論我嘗試什麼,我都無法實現它。有人知道我失敗的地方嗎?

謝謝你在前進,

編輯:

akka { 
    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 50015 
    } 
} 
} 
+0

你確定演員#15存在於端口50015嗎? – Josef

+0

除非我對如何創建演員和演員系統有錯誤的理解,是的,我敢肯定。 – PablodeAcero

回答

1

重建並運行你的例子後,我發現在propagateMessage功能一個錯誤。

val nextHopId = localId-1 

應該

val nextHopId = if (currentPort == 50001) 16 else localId-1 

如果不解決您的問題,請嘗試運行我的快速和骯髒的,但工作代碼,看看是如何從你這不同:https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416

在我代碼我只使用角色14,15和16.你可以運行每個使用sbt "run 16"等。

+0

是的,我只是在提交問題後纔看到這個錯誤,但修復它並沒有幫助。我將我的代碼更改爲您的代碼,但它仍然無法工作......問題可能與我的本地主機配置有關嗎?或者使用我正在使用的akka​​版本(2.5.0) – PablodeAcero

+0

我添加了'akka-remote'依賴項,它位於我的build.sbt的最後一行。從理論上講,你的防火牆可能會阻止通信,但似乎不太可能。你會提供所有三個節點的全部輸出嗎?這裏是我的:(注意我首先啓動節點14,然後是15,然後是16):http://imgur.com/a/l53pV – Josef

+0

嗨!我試過你的代碼和配置。節點14和15具有正常輸出。但是,節點16給我以下內容:http://imgur.com/a/tZi6T – PablodeAcero