2015-02-08 101 views
5

如果我有一個Observalbe:RxJava Observable和Subscriber用於跳過異常?

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4); 
Observable<Integer> o1 = Observable.from(ints); 

我想12生成另一個觀察到,這鴻溝:

Observable<Integer> o2 = o1.map(i -> 12/i); 
o2.subscribe(
    v -> logger.info ("Subscriber value {}", v) , 
    t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage()) 
); 

很明顯它會得到錯誤,當它遇到停止「0」:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber onError class java.lang.ArithmeticException :/by zero 

但是如果我想讓觀察者(o2)跳過異常呢?

我看着RxJava的文檔error handling,沒有辦法跳過這個錯誤。 onErrorResumeNext()onExceptionResumeNext()需要備份/備份Observable,這不是我想要的。 onErrorReturn需要指定返回值。

所有三種錯誤處理方法都無法恢復原始觀察者。例如:

Observable<Integer> o2 = o1.map(i -> 12/i) 
     .onErrorReturn(t -> 0); 

它打印:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber value 0 

不打印剩下的12/3 12/4和

唯一的解決辦法似乎繼電器在map功能:

Observable<Integer> o2 = o1.map(i -> { 
    try { 
    return Optional.of(12/i); 
    } catch (ArithmeticException e) { 
    return Optional.empty(); 
    } 
}).filter(Optional::isPresent) 
    .map(o -> (Integer) o.get()); 

它的工作,但它很麻煩。我不知道是否操縱Observable(如map

以上是關於Observable跳過異常時,有什麼辦法可以輕鬆地跳過任何RuntimeException。以下是有關在Subscriber跳過例外:

的情況是一樣的:

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4); 
Observable<Integer> o1 = Observable.from(ints); 
o1.subscribe(
    i -> logger.info("12/{} = {}", i, 12/i), 
    t -> logger.error("{} : {}", t.getClass() , t.getMessage()), 
() -> logger.info("onCompleted") 
); 

它打印出:

當例外onNext發生時,它會觸發onError和NOT響應Observable的任何數據。如果我希望訂戶吞下異常,我必須嘗試捕捉onNext()中的ArithmeticException。有沒有更清潔的解決方案?

看來當SubscriberonNext()面臨錯誤,在(onNext)內無法處理時,它應該停止,對吧?這是一個好的設計嗎?

+1

我看它的樣子,'onNext()'等同於'Iterator.next()' - 當有迭代集合的問題 - 將拋出一個異常,並迭代撈出(它不't「resume」iterating) - 例如,請參閱[ConcurrentModificationException](http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html)。這與我們在這裏的行爲是一樣的。也就是說,@benjchristensen或許能夠對這個問題有更多的瞭解 – alfasin 2015-02-08 05:27:20

+0

「可選」映射可能是我最喜歡的處理方式。這很清楚,過濾器很好地消化了事件。封裝它是一個很好的方法。我會在scala中使用'Try'類型,然後進行過濾。 – BeepDog 2018-02-21 21:49:31

回答

1

試試這個方法。

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty()); 

在上面的代碼中,錯誤事件被替換爲空的流,原始流正在恢復。因此跳過錯誤。

+0

對不起,在RxJava 1.0中沒有'Observable.onErrorFlatMap()'... – smallufo 2015-02-08 06:32:58

+0

哦,對不起,我沒有注意到版本。但是可以使用'materialize'和'dematerialize'來實現'onErrorFlatMap'類似的組合器,儘管有點困難。 – findall 2015-02-08 06:37:57

+0

我也很好奇,爲什麼這種方法在1.0中消失。 – smallufo 2015-02-08 06:39:32

5
Observable.just(1, 2, 3, 0, 4) 
      .flatMap(i -> Observable.defer(() -> Observable.just(12/i)) 
        .onErrorResumeNext(Observable.just(0))); 

它是一種方法,但請記住,RxJava假定錯誤是真正沒有得到解決的(您可以預期值爲0)。另一方面,如果你想忽略除以0的異常,也許你應該在分割之前過濾/映射你的值。

+0

我在這裏改變的唯一東西是 'Observable.from(1,2,3,0,4)'到'Observable.just(1,2,3,0,4)',因爲在1.0.0中它不會用'from'編譯。 – meddle 2015-02-09 08:50:05

0
import rx.lang.scala._ 

import scala.concurrent.duration.DurationDouble 
import scala.util.{Failure, Success, Try} 

val o1 = List(1, 2, 3, 0, 4) toObservable 
val o2 = List(1, 2, 3, 4, 5) toObservable 

val o3 = o1 zip o2 map {case (i1, i2) => i2/i1 } // this will trigger a division by 0 error 

val o4 = o3 lift { 
     subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) { 
     override def onNext(v: Int) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Success(v)) 
     } 

     override def onError(e: Throwable) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Failure(e)) 
     } 

     override def onCompleted() { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onCompleted() 
     } 
     } 
    } 

o4 subscribe(println(_))