2013-05-20 48 views
2

我試圖在ClusterRouter配置中向所有路由器廣播消息。我已經嘗試了兩種選擇。這一個:向Akka的ClusterRouter中的管理員發送廣播消息

val workerRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(AdaptiveLoadBalancingRouter(metrics), ClusterRouterSettings(
     totalInstances = 100, routeesPath = "/user/slave", 
     allowLocalRoutees = true, useRole = None))), name = "slaveRouter") 

    context.system.scheduler.schedule(2 seconds, 5 seconds, workerRouter, Broadcast(CapabilityRequest)) 

這一個:

val broadcastRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(BroadcastRouter(Nil), ClusterRouterSettings(
     totalInstances = 100, routeesPath = "/user/slave", 
     allowLocalRoutees = true, useRole = None))), name = "slaveRouter") 

    context.system.scheduler.schedule(2 seconds, 5 seconds, broadcastRouter, CapabilityRequest) 

但對他們倆的,只有slaves的一個接收消息。思考?


爲了理解爲什麼我相信第一次嘗試應該有工作,一個人看AdaptiveLoadBalancingRounter.scala,在AdaptiveLoadBalancingRouterLike特點,創建Route時:

{ 
    case (sender, message) ⇒ 
    message match { 
     case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) 
     case msg   ⇒ List(Destination(sender, getNext())) 
    } 
} 
+0

正如我在郵件列表上所詢問的那樣:您能否在發送郵件時提供羣集實際上擁有多個成員的證明? –

+0

我可以給你整個代碼,是的,但是,然後再次:(i)RoundRobinRouter向每個成員發送消息,(ii)通過遍歷網絡中所有成員的「手動」廣播似乎工作。 –

回答

2

在你的第一個例子,您正在使用僅發送給一個路由器的路由器。從我讀過的文檔中,該路由器將使用來自不同節點的可用指標來選擇看起來受到最少脅迫的節點,並向該節點上的路由器發送消息。我認爲你所看到的這種設置的行爲是可以預料的。

在第二個例子中,我沒有看到關於在集羣環境中使用BraodcastRouter的文檔中的任何內容,所以我不確定這種方法是否受支持。話雖如此,我的猜測是創建帶有空路由列表(Nil)的BraodcastRouter是導致您所看到的行爲的原因。我認爲如果您將其更改爲BroadcastRouter(100),您可能會看到不同的行爲。但是,我不認爲(基於文檔中缺少示例)支持使用BroadcastRouter(並且我可能是錯的)。

你能解釋一下你的用例,所以我可以理解你爲什麼需要一個廣播類型路由器爲你的集羣?

編輯

FWIW,我得到的東西用下面的代碼工作。首先,配置:

akka { 
    actor { 
    provider = "akka.cluster.ClusterActorRefProvider" 
    } 
    remote { 
    transport = "akka.remote.netty.NettyRemoteTransport" 
    log-remote-lifecycle-events = off 
    netty { 
     hostname = "127.0.0.1" 
     port = 0 
    } 
    } 

    cluster { 
    min-nr-of-members = 2 
    seed-nodes = [ 
     "akka://[email protected]:2551", 
     "akka://[email protected]:2552"] 

    auto-down = on 
    } 
} 

於是,我開始了使用下面的代碼的兩個節點(一個2551對2552的除外):

object ClusterNode { 

    def main(args: Array[String]): Unit = { 

    // Override the configuration of the port 
    // when specified as program argument 
    if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) 


    // Create an Akka system 
    val system = ActorSystem("ClusterSystem") 
    val clusterListener = system.actorOf(Props(new Actor with ActorLogging { 
     def receive = { 
     case state: CurrentClusterState => 
      log.info("Current members: {}", state.members) 
     case MemberJoined(member) => 
      log.info("Member joined: {}", member) 
     case MemberUp(member) => 
      log.info("Member is Up: {}", member) 
     case UnreachableMember(member) => 
      log.info("Member detected as unreachable: {}", member) 
     case _: ClusterDomainEvent => // ignore 

     } 
    }), name = "clusterListener") 

    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])  
    } 

} 

class FooActor extends Actor{ 

    override def preStart = { 
    println("Foo actor started on path: " + context.self.path) 
    } 

    def receive = { 
    case msg => println(context.self.path + " received message: " + msg) 
    } 
} 

我於是開了第三屆「節點」,我的客戶端節點,使用下面的代碼:

object ClusterClient { 
    def main(args: Array[String]) { 
    val system = ActorSystem("ClusterSystem") 

    Cluster(system) registerOnMemberUp{ 
     val router = system.actorOf(Props[FooActor].withRouter(
     ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector), 
     ClusterRouterSettings(
     totalInstances = 20, maxInstancesPerNode = 10, 
     allowLocalRoutees = false))), 
     name = "fooRouter") 

    router ! Broadcast("bar") 
    } 
    } 
} 

當發送的消息時,我看到了它在這兩個服務器節點的虛擬機,每個虛擬機10名演員獲得好評。

我的路由器和你的路由器之間的區別是,我沒有指定本地路由,我換了routeesPathmaxInstancesPerNode。我希望這有幫助。

+0

在原始文章中查看我的更新。謝謝。 –

+0

有趣。我通過源代碼查看了所有內容,而且我也沒有看到一個很好的解釋,爲什麼這不起作用。在發送廣播消息之前,您是否看到創建了100個從站? – cmbaxter

+0

添加了更多信息,包括工作代碼示例 – cmbaxter