2016-09-23 81 views
0

我試過下面FlowableOnBackpressureDrop忽略認購的請求方法

Flowable.interval(100L, TimeUnit.MILLISECONDS) 
    .onBackpressureDrop() 
    .observeOn(Schedulers.computation()) 
    .subscribe(new Subscriber<Long>() { 

     private Subscription subscription; 

     @Override 
     public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     this.subscription.request(1L); 
     } 

     @Override 
     public void onNext(Long t) { 
     try { 
      Thread.sleep(300L); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println(t); 

     subscription.request(1L); 
     } 

     ... 
    }); 

我的預期,我可能會得到這樣的0,3,6 ... 不過,我得到0,1,2,3 ... 因爲Flowable最初得到了request(Long.MAX_VALUE)

我檢查FlowableOnBackpressureDrop,我發現

@Override 
    public void onSubscribe(Subscription s) { 
     if (SubscriptionHelper.validate(this.s, s)) { 
      this.s = s; 
      actual.onSubscribe(this); 
      s.request(Long.MAX_VALUE); 
     } 
    } 

由於沒有使用我於請求方法中設置的值, 我認爲背壓不工作。

這是錯誤還是正確的操作?

我試過RxJava 2.0.0 RC2和RC3

回答

1

observeOn請求,因此沒有下降一段時間在一開始緩存128元的前期。如果你讓它運行128 * 300ms,它將開始跳過數值。您可以將預取量設置爲1,並且看到值相當快地下降。

+0

謝謝您的信息!現在,我知道原因是'observeOn'和如何解決。我加了1來觀察On的參數,我得到了我想要的。儘管我對背壓下降選項緩衝項目感到不舒服,因爲我不僅要關心請求方法,還要關心observeOn方法,RxJava 1.1的作用與此相同。所以這是從一開始的方式。我想知道有沒有人對此有任何疑問。 – arching