2017-02-20 36 views
0

在反應式Java中,我們被告知.subscribe()調用返回「一個訂閱參考」。但是Subscription是一個接口,不是一個類。那麼我們遞交的是什麼樣的對象來實現這個接口?我們是否有任何控制權?什麼樣的對象是反應式Java訂閱?

Subscriptions類可以創建和返回幾種不同種類的Subscription,但是用它們做什麼?如果我寫

Subscription mSub = Subscriptions.create(<some Action0>); 
mSub = someObservable.subscribe(); 

不會我剛剛創建的Subscription只需無論.subscribe()調用返回被改寫?你如何使用你創建的Subscription

(在一定程度上相關的說明,什麼是Subscriptions.unsubscribed()點,其中「返回訂閱到退訂什麼都不做,因爲它已經退訂。咦?)

回答

2

簡短的回答:你不應該在乎。

再回應:簽約爲您提供了兩種方法:

  • unsubscribe(),這將導致訂閱終止。
  • isUnsubscribed(),檢查是否已經發生。

您可以使用這些方法a)檢查Observable鏈是否終止,以及b)是否導致它過早終止,例如,如果用戶切換到不同的活動。

就是這樣。你不會接觸的目的。另外,你注意到沒有resubscribe方法嗎?這是因爲如果您想重新啓動操作,您需要重新訂閱Observable,爲您提供新的訂閱。

1

正如你所知道Subscription s的使用(例如在Android應用程序中,當您更改Activity(屏幕)時,請刷新舊的ActivityObservable s。在此方案中,Subscription實例由.subscribe()(如您所述)和所以,出於這個原因,我們會直接創建一個Subscription,特別是Subscriptions.unsubscribed() ?我遇到兩種情況:

  • 默認實現;避免像Subscription mSub;這樣的聲明,將填補後者,並可能創建一個NPE。如果您使用需要屬性初始化的Kotlin,則尤其如此。

  • 測試

+0

謝謝。問題:在活動重新啓動時,以某種方式存儲Android Activity訂閱是否允許重新連接到Observable?你能給個例子嗎?我如何在默認實現中使用創建的訂閱?我已經在Android應用程序中聲明瞭一些訂閱(對於RxAndroidBle),並且它似乎沒有引起問題。 –

+0

保持'Subscription'引用的主要用法是稍後調用'.dispose()'。所以,如果你不處理取消訂閱,只需刪除'訂閱'引用,寫'myObservable.subscribe();'而不分配給變量。一個典型的用法是在'onResume()'方法中訂閱,存儲'Subscription',並在'onPause()'中調用'subscription.dispose();' –

0

在一個有點相關的說明,什麼是Subscriptions.unsubscribed(),其中「返回訂閱到退訂什麼都不做,因爲它已經退訂。咦點?

在1 。x,Subscriptions.unsubscribed()用於返回一個Subscription實例操作已完成(或從未在第一個地方運行),當控制從RxJava返回到您的代碼。由於取消訂閱是無狀態和常量狀態,返回的Subscription是一個單例,因爲僅通過查看接口Subscription就沒有(合理的)方法來區分一個已完成/未訂閱的Subscription與另一個。

在2.x中,其等效接口有一個公共和內部版本,Disposable。內部版本主要用於替換已終止的Disposable,避免NullPointerException和空檢查,並有助於GC。

與他們做什麼?

通常你不需要擔心Subscriptions.create();它提供了你有你想要的資源附加到您的最終用戶的生命週期的情況下:

FileReader file = new FileReader ("file.txt"); 

readLines(file) 
.map(line -> line.length()) 
.reduce(0, (a, b) -> a + b) 
.subscribe(new Subscriber<Integer>() { 
    { 
     add(Subscriptions.create(() -> { 
      Closeables.closeSilently(file); // utility from Guava 
     }); 
    } 
    @Override public void onNext(Integer) { 
     // process 
    } 
    // onError(), onCompleted() 
}); 

這個例子,證明使用的一種方式,可以通過using,而不是仍然表示:

Observable.using(
    () -> new FileReader("file.txt"), // + try { } catch { } 
    file -> readLines(file).map(...).reduce(...), 
    file -> Closeables.closeSilently(file) 
) 
.subscribe(...)