2016-09-27 59 views
0

我正在尋找akka調度程序使用的一些示例。我有一個演員(讓我們稱之爲 - dataProducer)實現從一些數據庫中檢索數據。我想寫一個調度程序actor,它將在5秒鐘間隔內爲dataProducer actor提供極點。如果數據檢索比調度程序間隔需要更多時間,那麼該如何處理這種情況。 Scheduler actor中的scheduleOnce方法會處理這個問題嗎?使用akka調度程序的數據推送

這裏是我的調度演員

import java.util.concurrent.{Executors, TimeUnit} 
import akka.actor.{Actor, Cancellable, Props} 
import scala.concurrent.ExecutionContext 

class SchedulerActor(interval: Long) extends Actor with LogF{ 

    implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(100)) 

    private var scheduler: Cancellable = _ 

    override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    scheduler = context.system.scheduler.schedule(
     initialDelay = 0 seconds, 
     interval = FiniteDuration(interval, TimeUnit.SECONDS), 
     receiver = self, 
     message = FetchData 
    ) 
    } 

    override def postStop(): Unit = { 
    scheduler.cancel() 
    } 

    def receive = { 
    case FetchData => 
     logger.debug("Fetch Data") 
     sender() ! "Data Fetched!!!" //here I'll call dataProducer API 
     true 
    case unknown => 
     throw new RuntimeException("ERROR: Received unknown message [" + unknown + "], can't handle it") 
    } 

} 

object SchedulerActor { 

    def props(interval: Long): Props = Props(new SchedulerActor(interval)) 
} 

sealed trait FetchDataMessage 
case object FetchData extends FetchDataMessage 
+0

如果你想得到有意義的答案,我建議你展示你到目前爲止嘗試過的。 – hasumedic

+0

更新了我的問題... – Abhay

回答

0

調度的scheduleOnce幫助一些延遲後執行一段。 有不同的狀態和狀態之間切換接受不同類型的消息,並採取相應的行動。但是當超時發生時,scheduleOnce會將你帶到timeoutState。

ScheduleOnce將幫助演員知道發生超時。

如果數據檢索需要比調度程序間隔更多的時間,如何處理這種情況?

如果數據獲取超過指定時間actor狀態更改timeoutState並處於超時狀態說明應該對actor執行什麼操作。您可以重試或嘗試不同的來源。

我想編寫一個調度演員,這將極在間隔5秒

在結果狀態等待scheduleOnce 5秒dataProducer演員延遲請求dataProducer和整個事情再次重複。

檢查此代碼以瞭解如何完成。

import akka.actor.{Actor, Cancellable} 
import stackoverflow.DBUtils.Entity 

import scala.concurrent.Future 
import scala.concurrent.duration._ 
import akka.pattern.pipe 


object DBPollActor { 
    case class Result(results: List[Entity]) 
    case object Schedule 
    case object Timeup 
    case object FetchData 
} 

object DBUtils { 
    case class Entity(name: String) 

    def doDBOperation: Future[List[Entity]] = { 
    Future.successful(List(Entity(name = "foo"))) 
    } 

} 

class DBPollActor(timeout: Int) extends Actor { 

    import DBPollActor._ 

    implicit val ex = context.system.dispatcher 

    var schedulerOpt: Option[Cancellable] = None 

    @scala.throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
    super.preStart() 
    self ! FetchData 
    schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
    }) 
    } 

    override def receive: Receive = { 
    case [email protected] => 
     context become startState 
     self forward msg 
    } 

    def startState: Receive = { 
    case FetchData => 
     schedulerOpt.map(_.cancel()) 
     context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 
    } 

    def timeoutState: Receive = { 
    case Timeup => 
     schedulerOpt.map(_.cancel()) 
     //Timeout happened do something or repeat 
    } 

    def resultState: Receive = { 
    case [email protected](list) => 
     schedulerOpt.map(_.cancel()) 
     //Result available consume the result and repeat or doSomething different 
    context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 

    case ex: Exception => 
     schedulerOpt.map(_.cancel()) 
     //future failed exit or retry 
    } 
}