演員一次處理一個消息。處理多條消息的經典模式是讓一個協調員角色面向一組消費者角色。如果使用react,那麼消費池可能很大,但仍然只使用少量的JVM線程。這裏有一個例子,我創建了一個由10名消費者和一名協調員組成的池。
import scala.actors.Actor
import scala.actors.Actor._
case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop
def consumer(n : Int) = actor {
loop {
react {
case Ready(sender) =>
sender ! Ready(self)
case Request(sender, payload) =>
println("request to consumer " + n + " with " + payload)
// some silly computation so the process takes awhile
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
sender ! Result(result)
println("consumer " + n + " is done processing " + result)
case Stop => exit
}
}
}
// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)
val coordinator = actor {
loop {
react {
case msg @ Request(sender, payload) =>
consumers foreach {_ ! Ready(self)}
react {
// send the request to the first available consumer
case Ready(consumer) => consumer ! msg
}
case Stop =>
consumers foreach {_ ! Stop}
exit
}
}
}
// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)
此代碼測試以查看哪些消費者可用並向該消費者發送請求。替代方案是隨機分配給消費者或使用循環調度程序。
根據你在做什麼,你可能會更好地服務於斯卡拉的期貨。例如,如果你真的不需要演員,那麼所有上述機器可以寫爲
import scala.actors.Futures._
def transform(payload : String) = {
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
println("transformed " + payload + " to " + result)
result
}
val results = for (i <- 0 to 1000) yield future(transform(i.toString))
我真的不明白這個問題是如何證明downvote。如果我可以閱讀關於Scala編程中演員的完整章節並且不確定答案,那麼這是一個完全有效的問題。 – 2009-06-18 07:33:55
單個參與者按順序處理消息,因爲這個想法是可以使用參與者調解對(另外的)共享資源的訪問,並且不需要共享資源上的其他同步。如果消息可以按順序處理,則需要在演員中使用鎖,以確保不會發生同步錯誤。 – 2010-01-04 06:27:15