我有以下代碼(*),它使用遞歸調用提供的observable的調度程序來實現輪詢。RxJava:調用onError而不完成/取消訂閱
(*)從https://github.com/ReactiveX/RxJava/issues/448
這是正常工作的啓發,當我只有通過onNext
事件給用戶。但是,當我將onError
事件傳遞給訂閱者時,會調用取消訂閱事件,這又會殺死調度程序。
我想也將錯誤傳遞給訂閱者。任何想法如何實現?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
// Doesn't work.
// subscriber.onError(e);
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
};
}
}
你可以舉例unsafeSubscribe()嗎?一個簡單的方法調用並不能解決問題。 – Alexandr 2016-01-30 06:18:29
@Alexandr這個答案已經過去了一段時間,自那以後API已經發生了很大的變化,所以這可能不再有效。此外,由OP記錄的解決方案可能更好。如果問題是相同的,我會使用該解決方案,或發佈一個新問題。 – Will 2016-01-30 07:19:31