2015-02-06 261 views
3

我有以下代碼(*),它使用遞歸調用提供的observable的調度程序來實現輪詢。RxJava:調用onError而不完成/取消訂閱

(*)從https://github.com/ReactiveX/RxJava/issues/448

這是正常工作的啓發,當我只有通過onNext事件給用戶。但是,當我將onError事件傳遞給訂閱者時,會調用取消訂閱事件,這又會殺死調度程序。

我想也將錯誤傳遞給訂閱者。任何想法如何實現?

public Observable<Status> observe() { 
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS)); 
} 

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> { 
    private Subscription subscription; 
    private Subscription innerSubscription; 
    private Scheduler.Worker worker = Schedulers.newThread().createWorker(); 

    private Observable<T> observable; 
    private long delayTime; 
    private TimeUnit unit; 

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) { 
     this.observable = observable; 
     this.delayTime = delayTime; 
     this.unit = unit; 
    } 

    @Override 
    public void call(final Subscriber<? super T> subscriber) { 
     subscription = worker.schedule(new Action0() { 
      @Override 
      public void call() { 
       schedule(subscriber, true); 
      } 
     }); 

     subscriber.add(Subscriptions.create(new Action0() { 
      @Override 
      public void call() { 
       subscription.unsubscribe(); 
       if (innerSubscription != null) { 
        innerSubscription.unsubscribe(); 
       } 
      } 
     })); 
    } 

    private void schedule(final Subscriber<? super T> subscriber, boolean immediately) { 
     long delayTime = immediately ? 0 : this.delayTime; 
     subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit); 
    } 

    private Action0 createInnerAction(final Subscriber<? super T> subscriber) { 
     return new Action0() { 
      @Override 
      public void call() { 
       innerSubscription = observable.subscribe(new Observer<T>() { 
        @Override 
        public void onCompleted() { 
         schedule(subscriber, false); 
        } 

        @Override 
        public void onError(Throwable e) { 
         // Doesn't work. 
         // subscriber.onError(e); 
         schedule(subscriber, false); 
        } 

        @Override 
        public void onNext(T t) { 
         subscriber.onNext(t); 
        } 
       }); 
      } 
     }; 
    } 
} 

回答

3

所以我一直玩這個一段時間,我不認爲這是可能的方式,你這樣做。調用onErroronCompleted可以終止流,翻轉SafeSubscriber包裝中的done標誌,並且沒有辦法重置它。

我可以看到2個選項可用 - 我認爲不是特別優雅,但會工作。

1 - UnsafeSubscribe。可能不是最好的主意,但它的工作原理,因爲它不是將Subscriber包裝在SafeSubscriber中,而是直接調用它。最好閱讀Javadoc,看看這對你是否合適。或者,如果您感覺冒險,請編寫您自己的SafeSubscriber,您可以在其中重置完成標誌或類似信息。以您爲例,請撥打電話:

observe.unsafeSubscribe(...) 

2 - 實施類似於this example的操作。我很感激它在C#中,但它應該是可讀的。簡單地說 - 你想創建一個Pair<T, Exception>類,然後,而不是調用onError,請致電onNext並設置您的配對的異常端。您的用戶必須更聰明地檢查這一對的每一邊,並且您可能需要在源ObservableObservable<Pair<T, Exception>>之間進行一些數據轉換,但我不明白爲什麼它不起作用。

如果有人有這樣做,我會真的很有興趣看到另一種做法。

希望這有助於

+0

你可以舉例unsafeSubscribe()嗎?一個簡單的方法調用並不能解決問題。 – Alexandr 2016-01-30 06:18:29

+0

@Alexandr這個答案已經過去了一段時間,自那以後API已經發生了很大的變化,所以這可能不再有效。此外,由OP記錄的解決方案可能更好。如果問題是相同的,我會使用該解決方案,或發佈一個新問題。 – Will 2016-01-30 07:19:31

3

由於@Will指出,你不能直接調用onError而不終止的觀察到。由於您只能撥打onNext,因此我決定使用Notification將值和throwable包裝在一個對象中。

import rx.*; 
import rx.functions.Action0; 
import rx.schedulers.Schedulers; 
import rx.subscriptions.Subscriptions; 

import java.util.concurrent.TimeUnit; 

public class PollingObservable { 
    public static <T> Observable<Notification<T>> create(Observable<T> observable, long delayTime, TimeUnit unit) { 
     return Observable.create(new OnSubscribePolling<>(observable, delayTime, unit)); 
    } 

    private static class OnSubscribePolling<T> implements Observable.OnSubscribe<Notification<T>> { 
     private Subscription subscription; 
     private Subscription innerSubscription; 
     private Scheduler.Worker worker = Schedulers.newThread().createWorker(); 

     private Observable<T> observable; 
     private long delayTime; 
     private TimeUnit unit; 

     private boolean isUnsubscribed = false; 

     public OnSubscribePolling(final Observable<T> observable, long delayTime, TimeUnit unit) { 
      this.observable = observable; 
      this.delayTime = delayTime; 
      this.unit = unit; 
     } 

     @Override 
     public void call(final Subscriber<? super Notification<T>> subscriber) { 
      subscription = worker.schedule(new Action0() { 
       @Override 
       public void call() { 
        schedule(subscriber, true); 
       } 
      }); 

      subscriber.onStart(); 
      subscriber.add(Subscriptions.create(new Action0() { 
       @Override 
       public void call() { 
        isUnsubscribed = true; 

        subscription.unsubscribe(); 
        if (innerSubscription != null) { 
         innerSubscription.unsubscribe(); 
        } 
       } 
      })); 
     } 

     private void schedule(final Subscriber<? super Notification<T>> subscriber, boolean immediately) { 
      if (isUnsubscribed) { 
       return; 
      } 

      long delayTime = immediately ? 0 : this.delayTime; 
      subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit); 
     } 

     private Action0 createInnerAction(final Subscriber<? super Notification<T>> subscriber) { 
      return new Action0() { 
       @Override 
       public void call() { 
        innerSubscription = observable.subscribe(new Observer<T>() { 
         @Override 
         public void onCompleted() { 
          schedule(subscriber, false); 
         } 

         @Override 
         public void onError(Throwable e) { 
          subscriber.onNext(Notification.<T>createOnError(e)); 
          schedule(subscriber, false); 
         } 

         @Override 
         public void onNext(T t) { 
          subscriber.onNext(Notification.createOnNext(t)); 
         } 
        }); 
       } 
      }; 
     } 
    } 
} 

要使用此功能,您可以直接使用的通知:

PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS) 
    .subscribe(new Action1<Notification<Status>>() { 
     @Override 
     public void call(Notification<Status> notification) { 
      switch (notification.getKind()) { 
       case OnNext: 
        Status status = notification.getValue(); 
        // handle onNext event 
        break; 
       case OnError: 
        Throwable throwable = notification.getThrowable(); 
        // handle onError event 
        break; 
      } 
     } 
    }); 

或者您可以使用通知上的accept方法使用常規的可觀察:

PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS) 
     .subscribe(new Action1<Notification<Status>>() { 
      @Override 
      public void call(Notification<Status> notification) { 
       notification.accept(statusObserver); 
      } 
     }); 

Observer<Status> statusObserver = new Observer<Status>() { 
    // ... 
} 

UPDATE 2015-02-24

看來輪詢可觀察者有時不能正常工作,因爲內部可觀察者即使在取消訂閱後也會調用onCompleteonError,從而重新安排自己。我添加了isUnsubscribed標誌來防止發生這種情況。

+0

真的很喜歡使用通知類 - 比使用Pair更清潔 – Will 2015-02-13 09:45:57