2016-03-09 68 views
2

這裏真的是一個片段,我想弄清楚:RxJava:緩衝項目,直到某些條件是當前項目

class RaceCondition { 

    Subject<Integer, Integer> subject = PublishSubject.create(); 

    public void entryPoint(Integer data) { 
     subject.onNext(data); 
    } 

    public void client() { 
     subject /*some operations*/ 
       .buffer(getClosingSelector()) 
       .subscribe(/*handle results*/); 
    } 

    private Observable<Integer> getClosingSelector() { 
     return subject /* some filtering */; 
    } 
} 

有一個Subject接受來自外部的事件。有一個客戶訂閱了這個主題,處理這些事件,也是buffer。這裏的主要想法是緩衝項目應該每次根據使用流中項目計算的條件發出。

爲此,緩衝區邊界本身監聽主體。

重要的期望行爲:無論何時邊界發出物品,它也應包含在buffer的以下發射中。當前配置不是這樣的情況(至少我認爲是這樣的)是從之前的關閉選擇器發出的,它到達buffer,因此它不包含在當前排放中,而是留在等待下一個一。

有沒有一種方法可以讓關閉選擇器等待首先緩衝的項目?如果沒有,是否有另一種方法來緩存和釋放基於下一個傳入項目的項目?

回答

2

如果我理解正確,你想緩衝,直到某些謂詞允許它基於項目。您可以使用一套複雜的操作員來完成此操作,但也許更容易編寫一個自定義操作員:

public final class BufferUntil<T> 
implements Operator<List<T>, T>{ 

    final Func1<T, Boolean> boundaryPredicate; 

    public BufferUntil(Func1<T, Boolean> boundaryPredicate) { 
     this.boundaryPredicate = boundaryPredicate; 
    } 

    @Override 
    public Subscriber<? super T> call(
      Subscriber<? super List<T>> child) { 
     BufferWhileSubscriber parent = 
       new BufferWhileSubscriber(child); 
     child.add(parent); 
     return parent; 
    } 

    final class BufferWhileSubscriber extends Subscriber<T> { 
     final Subscriber<? super List<T>> actual; 

     List<T> buffer = new ArrayList<>(); 

     /** 
     * @param actual 
     */ 
     public BufferWhileSubscriber(
       Subscriber<? super List<T>> actual) { 
      this.actual = actual; 
     } 

     @Override 
     public void onNext(T t) { 
      buffer.add(t); 
      if (boundaryPredicate.call(t)) { 
       actual.onNext(buffer); 
       buffer = new ArrayList<>(); 
      } 
     } 

     @Override 
     public void onError(Throwable e) { 
      buffer = null; 
      actual.onError(e); 
     } 

     @Override 
     public void onCompleted() { 
      List<T> b = buffer; 
      buffer = null; 
      if (!b.isEmpty()) { 
       actual.onNext(b); 
      } 
      actual.onCompleted(); 
     } 
    } 
} 
+0

非常感謝,大衛!這正是我所期待的。 – AndroidEx

+2

你能展示如何實現一個Observable作爲條件的'BufferUntil'嗎?緩衝區直到另一個觀察結束?我試圖得到這個工作,但只是不知道如何去做... – prom85