2017-09-21 61 views
0

我是RxJava的新手,所以我仍然試圖讓自己的頭靠近它。我有一個Observable代表一串按鈕點擊,所以它很熱。每次點擊該按鈕時,我都想做一些I/O。如果失敗,我想重複並嘗試再次執行該I/O,直到成功。這似乎是一個使用retry()repeat()的好機會,但那些只能用於熱門的可觀察對象,而不是冷的。正確的方式來重複發射事件流的操作

下面是一些僞代碼來獲得在我想要做的事:

buttonRequests 
    .map(actionEvent -> doIO()) 
    .repeatAboveIfFailedUntilIOSucceeds() 
    .subscribe(...); 

我使用flatMap複製的事件考慮,也比使用skip忽略其餘的,如果它成功了,但不會乾淨地給我一個不確定的嘗試次數。

什麼是正確的方式來思考這個問題?

+0

更多的例子你想重複一個失敗的動作爲每個按鈕點擊,直到成功?如果它正在重試並且新的遊戲會發生什麼? –

+0

是的,我會重試每個按鈕單擊操作,直到它成功。我會禁用該按鈕,以防止它在繼續時被重新點擊。 – Vultan

回答

1

請看看測試。在每個事件上,一個新的IO請求將被觸發。 Switch-Map就像Flat-Map一樣,但是當新的上游事件進入時,它將退訂最近的訂閱。如果你正在使用併發性,Flat-Map將會開始一個新的。因此,讓我們假設你的熱點observable激發了一個事件,flatMap開始在另一個線程(subscribeOn)上執行你的IO工作。如果有另一個事件進入,而最後一個事件仍在執行,它將開始執行另一個IO任務。 Switch-Map將取消訂閱最後一個,併爲當前事件啓動一個。讓我們看看retry() - 操作符。重試將重新訂閱'ioWorkWrapped'提供的觀察值,直到observable完成onComplete。這可能是非常危險的,因爲想象一下每次嘗試都會失敗。它會永遠旋轉。建議使用'exponential-backoff'並在X嘗試後提供備份可觀察失敗。對於「retryWhen」的使用,請看看這本優秀著作: Reactive Programming with RxJava

public class LibraryTest { 
    private AtomicInteger idx; 

    @Before 
    public void setUp() throws Exception { 
     idx = new AtomicInteger(0); 
    } 

    @Test 
    public void name() throws Exception { 
     Observable<String> stringObservable = Observable.just(1) 
       .switchMap(integer -> ioWorkWrapped() 
         .doOnError(throwable -> System.out.println("Something went wrong.")) 
         .retry() 
       ); 

     stringObservable.test() 
       .await() 
       .assertResult("value"); 


    } 

    private Observable<String> ioWorkWrapped() { 
     return Observable.defer(() -> { 
      try { 
       Thread.sleep(500); // IO Work 
       if (idx.getAndIncrement() < 5) { // for testing... 
        return Observable.error(new IllegalStateException("Wurst")); 
       } 
       return Observable.just("value"); 
      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }); 
    } 
} 
+0

謝謝!這正是我正在尋找的。我已經能夠將這個想法整合到我的代碼中;我也通過它學到了很多東西。 – Vultan

0

您需要使用運營商retryWhen的情況下,你的I/O操作失敗,你可以扔掉了在運營商籤一個Runnable例外。如果您遇到這種類型的異常,請重試。

在這個例子中,我們將重試4次。但是這種情況可以通過我們收到的可丟棄類型來改變。

int count=0; 

@Test 
public void retryWhenConnectionError() { 
    Subscription subscription = Observable.just(null) 
      .map(connection -> { 
       System.out.println("Trying to open connection"); 
       connection.toString(); 
       return connection; 
      }) 
      .retryWhen(errors -> errors.doOnNext(o -> count++) 
          .flatMap(t -> count > 3 ? Observable.error(t) : 
            Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), 
        Schedulers.newThread()) 
      .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

您可以在這裏看到https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

+0

謝謝你我發現了另一個更好的符合我需求的回答,但我也花了一些時間來處理這個問題,這有助於提高我的理解。 – Vultan

相關問題