2014-11-14 61 views
6

我有一個可觀察的數據來自數據庫遊標的快速數據流。我正在考慮以每秒x項目的速度節制輸出。到目前爲止,我一直在使用上的文檔描述調用棧阻塞:速率限制可觀察到的

observable.map(f -> { 
ratelimiter.acquire(); // configured limiter to only allow 
}); 

這是工作正常,但只是出於好奇有沒有更好的方式來處理這個使用背壓?

韓國社交協會

+0

你想'延遲'或'throttleFirst(throttleLast)'?如果收到物品太快,後者將丟棄物品。 – zsxwing 2014-11-17 06:39:07

回答

2

你可以嘗試使用rx.Observable#onBackpressureBuffer()與自定義的用戶將定期要求n項目每結合第二。但是,你將被綁定到硬件一秒鐘採樣。

注意.subscribeOn().toBlocking()只是使主要方法不立即退出。

public class BackpressureTest { 

    public static void main(final String[] args) { 
    Observable.range(1, 1000) 
     .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it 
     .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second 
     .subscribeOn(Schedulers.computation()) 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 

    private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { 
    return upstream -> periodicallyRequestingSubscriber(upstream, n); 
    } 

    private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { 
    return new Subscriber<T>() { 

     @Override 
     public void onStart() { 
     request(0); // request 0 so that source stops emitting 
     Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items 
     } 

     @Override 
     public void onCompleted() { 
     upstream.onCompleted(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
     upstream.onError(e); 
     } 

     @Override 
     public void onNext(final T integer) { 
     upstream.onNext(integer); 
     } 
    }; 
    } 
} 
0

@ michalsamek的答案似乎是正確的,儘管背壓只適用於Flowables。我已更正了他的訂閱者,以便它能夠完成要求的內容。

在不同時間爆發時使用它也有一個小問題。

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { 
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); 
} 


Observable.range(1, 100) 
    .observeOn(Schedulers.io()) 
    .toFlowable(BackpressureStrategy.BUFFER) 
    .compose(Flowable::onBackpressureBuffer) 
    .lift(allowPerMillis(200)) 
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); 



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { 

    private final Subscriber<T> upstream; 

    private final int millis; 

    // If there hasn't been a request for a long time, do not flood 
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true); 

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { 
     this.upstream = upstream; 
     this.millis = millis; 
    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     Observable 
       .interval(millis, TimeUnit.MILLISECONDS) 
       .subscribe(x -> { 
        if (shouldRequest.getAndSet(false)) 
         subscription.request(1); 
       }); 
} 

@Override 
public void onNext(T t) { 
    shouldRequest.set(true); 
    upstream.onNext(t); 
} 

@Override 
public void onError(Throwable throwable) { 
    upstream.onError(throwable); 
} 

@Override 
public void onComplete() { 
    upstream.onComplete(); 
} 
}