您可以使用自己的PriorityMailBox
,它可以處理消息的到達時間並將其用作附加優先級(對於具有相同「主」優先級的消息)。
像這樣(未測試):
import akka.dispatch._
import com.typesafe.config.Config
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import java.util.Comparator
import java.util.concurrent.PriorityBlockingQueue
class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedTimedPriorityMailbox(
TimedPriorityGenerator {
case 'highpriority ⇒ 0
case 'lowpriority ⇒ 2
case PoisonPill ⇒ 3
case otherwise ⇒ 1
})
case class TimedEnvelope(envelope: Envelope) {
private val _timestamp = System.nanoTime()
def timestamp = _timestamp
}
class UnboundedTimedPriorityMailbox(final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics {
override def queue: java.util.Queue[TimedEnvelope] = this
}
}
trait TimedQueueBasedMessageQueue extends MessageQueue {
def queue: java.util.Queue[TimedEnvelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
if (hasMessages) {
var envelope = dequeue()
while (envelope ne null) {
deadLetters.enqueue(owner, envelope)
envelope = dequeue()
}
}
}
}
trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) }
def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null)
}
object TimedPriorityGenerator {
def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator {
def gen(message: Any): Int = priorityFunction(message)
}
}
abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] {
def gen(message: Any): Int
final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = {
val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message)
// Int.MaxValue/Int.MinValue check omitted
if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result
}
}
謝謝,你用於上面例子的akka版本?我正在使用akka 2.0.4,並且出現一些編譯錯誤。 'UnboundedTimedPriorityMailbox.create'的方法參數是'Option [ActorContext]'和方法'cleanup(ActorContext,MessageQueue)'也需要定義。我已經做了一些修改,但是我最好使用與你的版本相同的版本。 – weima 2013-03-12 05:37:58
我用Scala 2.10.0使用Akka 2.1.1 – 2013-03-12 09:52:18