2016-04-21 51 views
4

我聽了這個演講 https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274RxJava可觀察的替代創建異步調用

而且耳,我應該避免使用create方法,因爲它不會自動處理退訂和反壓形成可觀察到的,但我可以在下面的代碼中找不到替代品。

compositeSubscription.add(
    Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() { 
     @Override 
     public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) { 

      modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() { 
       @Override 
       public void onResult(DTOCompaniesCallback dtoCompaniesCallback) { 
        try { 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onNext(dtoCompaniesCallback); 
          subscriber.onCompleted(); 
         } 
        } catch (Exception e) { 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onError(e); 
         } 
        } 
       } 
      }); 

     } 
    }) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Action1<DTOCompaniesCallback>() { 
     @Override 
     public void call(DTOCompaniesCallback dtoCompaniesCallback) { 
      Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size()); 
     } 
    }, new Action1<Throwable>() { 
     @Override 
     public void call(Throwable throwable) { 
      throw new OnErrorNotImplementedException("Source!", throwable); 
     } 
    }) 
); 

我呼籲清除CompositeSubscription中的OnDestroy方法

@Override 
public void onDestroy() { 
    if (compositeSubscription != null) { 
     compositeSubscription.clear(); 
    } 
} 

你看到任何替代品,我可以在這裏使用的創建方法? 您是否看到任何潛在的危險或者這種方法安全? 感謝

回答

5

您可以使用延遲+ AsyncSubject:

Observable.defer(() -> { 
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create(); 
    modelTrainStrike.getCompaniesFromServer(v -> { 
     async.onNext(v); 
     async.onComplete(); 
    }); 
    return async; 
}) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
... 

萬一getCompaniesFromServer支持取消,您可以:

Observable.defer(() -> { 
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create(); 
    Closeable c = modelTrainStrike.getCompaniesFromServer(v -> { 
     async.onNext(v); 
     async.onComplete(); 
    }); 
    return async.doOnUnsubscribe(() -> { 
     try { c.close(); } catch (IOException ex) { } 
    }); 
}) 
+0

偉大的作品,謝謝:) –