2017-07-28 110 views
1

我有一個緩衝的流,等待的靜默時間預定量,發佈已經緩存元素的列表之前:如何動態縮放爆發排放流的去抖動?

INTEGERS 
    .share() 
    .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) 
    .map { durations -> 
     ... 
    } 

我想打DEBOUNCE_TIME動態取決於平均調整的緩衝項目,但我很難找出如何實現這一目標。

回答

1

你可以推遲去抖,一旦新的防抖動值已經確定利用它和觸發重複一個項目:

int DEBOUNCE_TIME = 100; 
AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); 
PublishSubject<Integer> mayRepeat = PublishSubject.create(); 

AtomicInteger counter = new AtomicInteger(); 

Observable<Integer> INTEGERS = 
     Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) 
     .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) 
       .map(w -> counter.incrementAndGet())); 

INTEGERS.publish(o -> 
     o.buffer(
      Observable.defer(() -> 
       o.debounce(
        debounceTime.get(), TimeUnit.MILLISECONDS) 
      ) 
      .take(1) 
      .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) 
     ) 
    ) 
    .map(list -> { 
     int nextDebounce = Math.min(100, list.size() * 100); 
     debounceTime.set(nextDebounce); 
     mayRepeat.onNext(1); 
     return list; 
    }) 
    .blockingSubscribe(System.out::println); 

此打印:

[1, 2] 
[3, 4, 5] 
[6, 7, 8, 9] 
[10] 
+0

在我的項目實施之後,我不得不說這非常酷,而且實際上效果很好。一旦我開始編寫一些測試,它幫助我確定,由於數據集的限制,我所要完成的核心並不是真正可行的。我實際上有點難過,我不再有用例來保證包括這個整潔的小模式。感謝您花時間回覆! :) – saganaut