2015-03-19 89 views
0

假設我們再次有這樣的事情:實現循環作業重複不超過每秒

while (true) { 
    val job = Future { doSomething(); 1 } 
    val timeout = Future { Thread.sleep(1000); 2 } 

    val both = for (j <- job; t <- timeout) { 
    println("Done") 
    } 
    Await.result(both) 
} 

,這是什麼使用RX-的java /斯卡拉慣用的解決方案?

更新:更多的澄清,如果不是明顯的代碼。

TS ñ分別開始doSomething()工作的TE ň是時間戳。

然後下一個作業應在TS n + 1個 = MAX被調度(TE Ñ,TS Ñ + 1秒)

+1

['debounce'(http://reactivex.io/documentation/operators/debounce.html)運營商做這件事情。 – 2015-03-19 08:31:09

+0

所以你想安排新的工作,每次'doSomething()'完成後都要在1秒後運行? – 2015-03-20 09:01:02

+0

@jubis我在上面添加了一些更多的說明 – Tair 2015-03-20 13:22:45

回答

2

經過RxScala和Observables提供的所有可能性之後,我不得不說,這裏可能存在一個基本問題:觀察者的訂閱者不應該控制新值的發佈。可觀察事件是事件的來源,用戶是被動消費者。否則,例如,一個用戶可能會影響可觀測數據發送給其他用戶的輸出。

如果你仍然想使用observables這是我想出的最佳解決方案。它可以被觀察到並且定時器在一起,以便當計時器和作業都完成時它發出新的事件。

def run(job:() => Unit) { 

    val ready = Observable.create{ observer => 
    for(
     j <- future {job(); 1}; 
    ) observer.onNext(); 
    } 

    Observable.timer(1 seconds).zip(ready).subscribe{ value => 
    run(); 
    } 

} 

run(doSomenthing); 
+1

如果'doSomething()'需要更長的時間1秒?我不想重疊'doSomething()'調用。 – Tair 2015-03-19 14:04:09

+0

首先,由於「Rx承諾」的呼叫不會重疊。這是Rx庫如此有用的原因之一。 – 2015-03-19 14:50:14

+0

您的問題實際上並不相關,因爲如果您無法使用這些值,則不應創建此類計時器。但在其他一些情況下,您的問題將非常重要。基本上有兩種方法:忽略一些值或推遲它們。 – 2015-03-19 15:29:07

0

如果我正確地理解了這個問題,您需要做遞歸調度(因爲它似乎不會從作業中發出任何值)。下面是一個例子,如何用RxJava的Scheduler.Worker做到這一點:

public class RecurringJob { 
    static Subscription runJob(Runnable run) { 
     Scheduler.Worker w = Schedulers.newThread().createWorker(); 
     MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); 
     Action0 action = new Action0() { 
      @Override 
      public void call() { 
       try { 
        run.run(); 
        mas.set(w.schedule(this, 1, TimeUnit.SECONDS)); 
       } catch (Throwable t) { 
        t.printStackTrace(); 
        w.unsubscribe(); 
       } 
      } 
     }; 
     mas.set(w.schedule(action, 1, TimeUnit.SECONDS)); 
     return mas; 
    } 
    public static void main(String[] args) throws InterruptedException { 
     Subscription s = runJob(() -> System.out.println("Job")); 
     Thread.sleep(10000); 
     s.unsubscribe(); 
     System.out.println("Job stopped"); 
     Thread.sleep(3000); 
     System.out.println("Done."); 
    } 
}