2014-12-02 56 views
0

我有一個期貨池,每個未來的作品都與同一個AKKA演員系統 - 系統中的一些演員應該是全球性的,有些僅在未來使用。akka演員選擇沒有競賽條件

val longFutures = for (i <- 0 until 2) yield Future { 
    val p:Page = PhantomExecutor(isDebug=true) 
    Await.result(p.open("http://www.stackoverflow.com/") ,timeout = 10.seconds) 
    } 

PhantomExecutor tryes使用使用system.actorSelection

def selectActor[T <: Actor : ClassTag](system:ActorSystem,name:String) = { 
    val timeout = Timeout(0.1 seconds) 
    val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name) 
    val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity], 
     0.1 seconds) 

    aid.ref match { 
     case Some(cacher) => 
     cacher 
     case None => 
     system.actorOf(Props[T],name) 
    } 
    } 

一個全球共享的演員(簡單的增量計數器),但在併發環境中這種方法並沒有因爲比賽條件下工作。

我知道這個問題只有一個解決方案 - 在分解到期貨之前創建全球主角。但這意味着我無法封裝頂級庫用戶的大量隱藏工作。

回答

1

你是對的,確保全球演員首先被初始化是正確的方法。你不能將它們綁定到伴侶對象並從那裏引用它們,所以你知道它們只會被初始化一次嗎?如果你真的不能這樣做,那麼你可以嘗試這樣的東西來查找或創建演員。它類似於你的代碼,但它包含的邏輯通過查找回去/創建,如果比賽條件被擊中(最多隻能到最大次數)邏輯(遞歸):

def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = { 
    import system.dispatcher 
    val timeout = 0.1 seconds 

    def doFindOrCreate(depth:Int = 0):ActorRef = { 
     if (depth >= maxAttempts) 
     throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") 

     val selection = system.actorSelection(s"/user/$name") 
     val fut = selection.resolveOne(timeout).map(Some(_)).recover{ 
     case ex:ActorNotFound => None 
     } 
     val refOpt = Await.result(fut, timeout) 

     refOpt match { 
     case Some(ref) => ref 
     case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1)) 
     } 
    } 

    doFindOrCreate() 
    } 

現在重試邏輯會在創建actor時觸發任何異常,因此您可能需要進一步指定(可能通過另一個recover組合器),只在獲得InvalidActorNameException時遞歸,但您明白了。

0

您可能需要考慮創建一個負責創建「反」演員的經理演員。這樣你就可以確保counter actor創建請求被序列化。

object CounterManagerActor { 
    case class SelectActorRequest(name : String) 
    case class SelectActorResponse(name : String, actorRef : ActorRef) 
} 

class CounterManagerActor extends Actor { 
    def receive = { 
    case SelectActorRequest(name) => { 
     sender() ! SelectActorResponse(name, selectActor(name)) 
    } 
    } 

    private def selectActor(name : String) = { 
    // a slightly modified version of the original selectActor() method 
    ??? 
    } 
} 
+0

但我應避免兩次運行此CounterManager演員,或者運行這個演員的未來之外{...}調用 - 實際上這是我的問題,如何避免這種結構沒有小把戲,比如忽略了雙演員或連續異常選擇。 – Oleg 2014-12-02 18:29:51