2017-06-16 70 views
0

我有一個長時間運行的任務(說一個Observable<Integer>),我想在應用程序中儘可能多次觸發。我對處理以各種方式發送的事件的任務有多個「視圖」。我的整個應用程序中只有一個subscribe如何控制RxJava 2中的訂閱計數?

如何確保長時間運行的任務僅針對每個訂閱觸發一次,並且僅在訂閱需要時觸發?

爲了讓事情變得更具體,這裏是一個單元測試:

@Test 
public void testSubscriptionCount() { 

    final Counter counter = new Counter(); 

    // Some long running tasks that should be triggered once per subscribe 
    final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5) 
     .doOnSubscribe(subscription -> { 
      counter.increment(); 
     }); 

    // Some "view" on the long running task 
    final Observable<Integer> b = a.filter(x -> x % 2 == 0); 

    // Another "view" on the long running task 
    final Observable<Integer> c = a.filter(x -> x % 2 == 1); 

    // A view on the views 
    final Observable<Integer> d = Observable.zip(b, c, (x, y) -> x + y); 

    d.toList().blockingGet(); 

    assertEquals(1, counter.count); // Fails, counter.count == 2 
} 

我想a當其意見(bcd)一個訂閱才能觸發,但每次訂閱也只有一次。

在上面的代碼中,訂閱發生兩次(我認爲d觸發bc,這兩個都獨立觸發a)。


添加.share()沒有解決不了的問題(雖然我認爲這是沿着正確的路線):

// Some long running tasks that should be triggered once per subscribe 
    final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5) 
     .doOnSubscribe(subscription -> counter.increment()) 
     .share(); 

java.lang.AssertionError:

Expected :1

Actual :2

+1

我不清楚「只在需要時觸發」這個短語的意思。你有單元測試嗎? –

+0

我的意思是observable只能在有人直接訂閱它或者從它構建的其他observable時才運行它的發射器代碼。 – sdgfsdh

+0

你所描述的是觀察者的工作方式。你有什麼理由相信他們不這樣工作? –

回答

0

如果你的目標是防止多次執行,當觀察者訂閱平行,.share()是你在找什麼:

Observable<Integer> shared = source.share(); 

// In thread 1: 

shared.subscribe(...); 

// In thread 2: 

shared.subscribe(...); 

只要源觀察值在第二次訂閱發生時尚未完成,它將收到與第一次相同的結果,並且不會強制源觀察值的另一次執行。

RxJava文檔有更詳細的解釋,但它基本上是一個包裝器,它有一些引用計數,只有在需要時纔會訂閱源可觀察值以避免併發執行。

同時請記住,時間將扮演實際交付價值的重要部分。我不相信.share()會做任何特定的元素緩衝,所以如果元素在第二次訂閱之前交付,第二次訂閱不會獲得這些元素。您必須使用.buffer()或其他一些方法來保留晚期訂戶的結果。

+0

我認爲'.share()'是正確的想法,但它仍然給我兩個訂閱(見編輯)。 – sdgfsdh

+0

是的,它會給你兩個訂閱,因爲你的操作不是長時間運行。如果在.doOnSubscribe之前添加一個.delay(1L,TimeUnit.SECONDS),它完全符合你的要求。共享旨在防止併發執行。在你的情況下,它會很快完成兩次執行的共享。 – Matt

+0

我明白了。有沒有像'share'這樣的變體,但是在第一次觸發後沒有再次訂閱? – sdgfsdh