2016-06-09 83 views
5

我想UTIL添加after(d: FiniteDuration)(callback: => Unit) Scala的Future s表示將使我能夠做到這一點:如何將基於時間的觀察者添加到Scala Future?

val f = Future(someTask) 

f.after(30.seconds) { 
    println("f has not completed in 30 seconds!") 
} 

f.after(60.seconds) { 
    println("f has not completed in 60 seconds!") 
} 

我怎樣才能做到這一點?

回答

0

一種方法是使用Future.firstCompletedOf(見本blogpost):

val timeoutFuture = Future { Thread.sleep(500); throw new TimeoutException } 

val f = Future.firstCompletedOf(List(f, timeoutFuture)) 
f.map { case e: TimeoutException => println("f has not completed in 0.5 seconds!") } 

其中TimeoutException一些異常或類型。

+0

但是'firstCompletedOf'不會取消其他未來如果第一個返回。所以如果我的期貨的大部分時間是最後幾毫秒,但我想在30秒後添加一條調試語句,我將創建大量的Thread.sleep(30000),它不會被正確取消? – pathikrit

+0

@pathikrit是的,但結果將被扔掉。如果這是一個無阻塞的未來(例如,博客文章中的'val timeoutFuture = akka.pattern.after(500.milliseconds,using = system.scheduler){...}',那麼我認爲這不是問題(它不會阻塞線程)。 –

0

使用import akka.pattern.after。如果你想實現它沒有AKK這裏是source code。另一個(java)示例是中的TimeoutFuture

1

我通常使用一個線程池執行人及承諾:

import scala.concurrent.duration._ 
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor} 
import scala.concurrent.{Future, Promise} 

val f: Future[Int] = ??? 

val executor = new ScheduledThreadPoolExecutor(2, Executors.defaultThreadFactory(), AbortPolicy) 

def withDelay[T](operation: ⇒ T)(by: FiniteDuration): Future[T] = { 
    val promise = Promise[T]() 
    executor.schedule(new Runnable { 
    override def run() = { 
     promise.complete(Try(operation)) 
    } 
    }, by.length, by.unit) 
    promise.future 
} 

Future.firstCompletedOf(Seq(f, withDelay(println("still going"))(30 seconds))) 
Future.firstCompletedOf(Seq(f, withDelay(println("still still going"))(60 seconds))) 
0

事情是這樣的,也許:

object PimpMyFuture { 
    implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { 
     def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { 
      Future { 
      blocking { Await.ready(f, delay) } 
      } recover { case _: TimeoutException => callback } 
      f 
     } 
    } 
    } 

    import PimpMyFuture._ 
    Future { Thread.sleep(10000); println ("Done") } 
    .after(5.seconds) { println("Still going") } 

這個實現很簡單,但它基本上雙打您需要的線程數 - 每個活躍的未來都有效地佔據兩條線索 - 這有點浪費。或者,您可以使用計劃任務使您的等待無阻塞。我不知道Scala中的一個「標準」調度(每個LIB都有自己的),但對於這樣一個簡單的任務,你可以使用Java的TimerTask直接:

object PimpMyFutureNonBlocking {  
val timer = new java.util.Timer 

implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { 
    def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { 
     val task = new java.util.TimerTask { 
      def run() { if(!f.isCompleted) callback } 
     } 
     timer.schedule(task, delay.toMillis) 
     f.onComplete { _ => task.cancel } 
     f 
    } 
    } 
}