2013-03-11 133 views
3

生產者演員可以發送消息給其他演員以便立即處理嗎?即將消息發佈到消費者郵箱的頭部而不是消費者郵箱的尾部?akka演員發郵件給郵箱頭

我知道akka提供了一種配置我自己定義的郵箱類型的方法,但是如何控制是否需要將某些類型的郵件發佈到MailBox的頭部而不是尾部。 例如TimerMessages。我想要一個精確的計時器控制時間窗口實現。消息必須保持1000毫秒(比方說),如果消息處理消耗時間,並且在郵箱中有很多待處理的消息,我不希望計時器消息被追加到同一個隊列中。

我可以用一個PriorityMailBox,但PriorityMailBox麻煩的是,即使它可以在郵箱的負責人,對於相同優先級的消息提出了更高優先級的消息(計時器消息),消息的郵箱的順序並不保證與到貨順序相同。所以我也不能使用priorityMailBox。

有人可以告訴我我可以如何實現這種行爲?

回答

4

您可以使用自己的PriorityMailBox,它可以處理消息的到達時間並將其用作附加優先級(對於具有相同「主」優先級的消息)。

像這樣(未測試):

import akka.dispatch._ 
import com.typesafe.config.Config 
import akka.actor.{ActorRef, PoisonPill, ActorSystem} 
import java.util.Comparator 
import java.util.concurrent.PriorityBlockingQueue 

class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config) 
    extends UnboundedTimedPriorityMailbox(
    TimedPriorityGenerator { 
     case 'highpriority ⇒ 0 

     case 'lowpriority ⇒ 2 

     case PoisonPill ⇒ 3 

     case otherwise  ⇒ 1 
    }) 

case class TimedEnvelope(envelope: Envelope) { 
    private val _timestamp = System.nanoTime() 
    def timestamp = _timestamp 
} 

class UnboundedTimedPriorityMailbox(final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType { 
    def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11) 
    final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = 
    new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics { 
     override def queue: java.util.Queue[TimedEnvelope] = this 
    } 
} 

trait TimedQueueBasedMessageQueue extends MessageQueue { 
    def queue: java.util.Queue[TimedEnvelope] 
    def numberOfMessages = queue.size 
    def hasMessages = !queue.isEmpty 
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { 
    if (hasMessages) { 
     var envelope = dequeue() 
     while (envelope ne null) { 
     deadLetters.enqueue(owner, envelope) 
     envelope = dequeue() 
     } 
    } 
    } 
} 

trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue { 
    def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) } 
    def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null) 
} 


object TimedPriorityGenerator { 
    def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator { 
    def gen(message: Any): Int = priorityFunction(message) 
    } 
} 


abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] { 
    def gen(message: Any): Int 

    final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = { 
    val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message) 
    // Int.MaxValue/Int.MinValue check omitted 
    if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result 
    } 

} 
+0

謝謝,你用於上面例子的akka​​版本?我正在使用akka 2.0.4,並且出現一些編譯錯誤。 'UnboundedTimedPriorityMailbox.create'的方法參數是'Option [ActorContext]'和方法'cleanup(ActorContext,MessageQueue)'也需要定義。我已經做了一些修改,但是我最好使用與你的版本相同的版本。 – weima 2013-03-12 05:37:58

+0

我用Scala 2.10.0使用Akka 2.1.1 – 2013-03-12 09:52:18

2

上述代碼工程確定。

只是一個細節。避免使用System.getTimeNano()。它因爲它是由每個CPU邏輯

Here another post

然後定義在多核機器的問題,我們已在消息的奇怪的行爲順序Dependending上哪個CPU enque它。

我用經典的System.currentTimeMillis()改變它。它不太精確,但在我們的情況下,如果具有相同優先級和相同毫秒生成時間的兩條消息,則不關心它們被處理的順序。

感謝您的代碼!