爲了避免OOM,我將一些Akka 1.1.3 actor的郵箱大小與共享的自定義分派器綁定在一起。例如:處理Akka actor有界的郵箱MessageQueueAppendFailedException
object Static {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(
"customDispatcher",
1000,
BoundedMailbox(capacity = 10)
)
}
class MyActor extends Actor {
self.dispatcher = Static.dispatcher
...
}
我想反應以郵箱溢出,所以我可以消息上游生產暫停(側注:它可悲看起來像actor.stop()
,等待和actor.start()
將拋出ActorStartException
)。在隊列填滿和隊列耗盡之間,一些數據丟失是可以接受的。
阿卡對Dispatchers章說
當試圖將消息發送到演員將拋出一個 MessageQueueAppendFailedException(「BlockingMessageTransferQueue 傳輸超時」),如果消息不能在被添加到郵箱 由pushTimeout指定的時間。
我在哪裏可以發現此異常?
該文檔使得聽起來像我需要將每個myActor ! message
包裝在try/catch中。是對的嗎?我真的想集中處理。我的Supervisor
可能攔截它並運行我的處理程序?
檢查它的目的是什麼? –
不知道這是否適用於您的特定情況,但如果您遇到可能導致郵箱溢出風險的情況,則可能需要考慮一種「拉」式設計,其中您的消費者演員要求來自制作人而不是製作人的作品垃圾郵件您的消費者。 –
@Viktor它採取上游行動,特別是暫停消費者(我會加入這個問題)。丹是對的,我一定會考慮拉模型。不過,我想現在就做很少的重新構建,並且數據丟失是可以接受的。 – Bluu