我有一個可觀察的數據來自數據庫遊標的快速數據流。我正在考慮以每秒x項目的速度節制輸出。到目前爲止,我一直在使用上的文檔描述調用棧阻塞:速率限制可觀察到的
observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});
這是工作正常,但只是出於好奇有沒有更好的方式來處理這個使用背壓?
韓國社交協會
我有一個可觀察的數據來自數據庫遊標的快速數據流。我正在考慮以每秒x項目的速度節制輸出。到目前爲止,我一直在使用上的文檔描述調用棧阻塞:速率限制可觀察到的
observable.map(f -> {
ratelimiter.acquire(); // configured limiter to only allow
});
這是工作正常,但只是出於好奇有沒有更好的方式來處理這個使用背壓?
韓國社交協會
使用sample
(throttleLast)操作:
Observable<T> throttled =
observable.sample(1/rate, TimeUnit.MILLISECONDS);
'sample'會丟失數據,我認爲這不是維尼尼斯想要的。 – Ztyx 2015-05-16 10:20:01
你可以嘗試使用rx.Observable#onBackpressureBuffer()
與自定義的用戶將定期要求n
項目每結合第二。但是,你將被綁定到硬件一秒鐘採樣。
注意.subscribeOn()
和.toBlocking()
只是使主要方法不立即退出。
public class BackpressureTest {
public static void main(final String[] args) {
Observable.range(1, 1000)
.compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
.lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
.subscribeOn(Schedulers.computation())
.toBlocking()
.subscribe(System.out::println);
}
private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
return upstream -> periodicallyRequestingSubscriber(upstream, n);
}
private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
return new Subscriber<T>() {
@Override
public void onStart() {
request(0); // request 0 so that source stops emitting
Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
}
@Override
public void onCompleted() {
upstream.onCompleted();
}
@Override
public void onError(final Throwable e) {
upstream.onError(e);
}
@Override
public void onNext(final T integer) {
upstream.onNext(integer);
}
};
}
}
@ michalsamek的答案似乎是正確的,儘管背壓只適用於Flowables。我已更正了他的訂閱者,以便它能夠完成要求的內容。
在不同時間爆發時使用它也有一個小問題。
private static <T> FlowableOperator<T, T> allowPerMillis(int millis) {
return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis);
}
Observable.range(1, 100)
.observeOn(Schedulers.io())
.toFlowable(BackpressureStrategy.BUFFER)
.compose(Flowable::onBackpressureBuffer)
.lift(allowPerMillis(200))
.subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value));
public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> upstream;
private final int millis;
// If there hasn't been a request for a long time, do not flood
private final AtomicBoolean shouldRequest = new AtomicBoolean(true);
public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) {
this.upstream = upstream;
this.millis = millis;
}
@Override
public void onSubscribe(Subscription subscription) {
Observable
.interval(millis, TimeUnit.MILLISECONDS)
.subscribe(x -> {
if (shouldRequest.getAndSet(false))
subscription.request(1);
});
}
@Override
public void onNext(T t) {
shouldRequest.set(true);
upstream.onNext(t);
}
@Override
public void onError(Throwable throwable) {
upstream.onError(throwable);
}
@Override
public void onComplete() {
upstream.onComplete();
}
}
你想'延遲'或'throttleFirst(throttleLast)'?如果收到物品太快,後者將丟棄物品。 – zsxwing 2014-11-17 06:39:07