我有一個工作池與負載均衡,它被定義爲一種方法來路由消息如下:是否有根據阿卡(斯卡拉)藏匿大小
class Worker(workerNr: Int) extends Actor with Stash
...
val workers = (1 to poolSize).map(c => context.actorOf(Props(() => new Worker(c)).withDispatcher("stash-dispatcher"), "worker" + c))
val pool = context.actorOf(Props[Worker].withRouter(SmallestMailboxRouter(routees = workers)))
...
pool ! Request("do something")
現在這個工人的演員是不是無狀態並且在他將請求轉發給另一個參與者(執行實際工作)並將所有下面的請求存儲起來之前使用,直到他獲得當前請求的響應(可能需要一段時間)。然後,他將響應發送給請求的演員,將所有存儲的消息分離出來,並在切換回不成功後處理下一個請求。
case [email protected](_) => {
val requestor = sender
requestHandler ! request
become {
case [email protected](_) => {
requestor ! response
unstashAll
unbecome
}
case msg => stash
}
}
我的問題是我正在使用的SmallestMailboxRouter。它將郵件路由到郵箱最小的工作人員。但是由於工作人員沒有阻止,並且存儲了他們目前無法處理的郵件,所以他們的郵箱總是非常空(與他們的存儲相反)。
我想要一個路由器,它將消息路由到最小隱藏的工作者。我以爲自己實現了一個Router,但是看着implementation of Stash,似乎我甚至無法訪問存儲大小,因爲存儲本身對存儲特性是私有的。
private var theStash = Vector.empty[Envelope]
有沒有辦法做到這一點,或者這是實現與負載均衡工作池錯誤的做法?
爲什麼不在路由器級別保存每個存儲的大小記錄,即每次將消息發送給給定的工作人員時會更新的地圖或其他內容? – Peter
也許這種模式是可能適合您的問題的東西:http://www.michaelpollmeier.com/akka-work-pulling-pattern/ – cmbaxter
@cmbaxter謝謝,這正是我需要的 – lSoleyl