1
我有研究和閱讀文件,他們不是很容易理解。 我想實現的是以下功能:彈簧反應堆項目的反應流背壓
我正在使用Spring Reactor項目並使用eventBus。我的事件總線正在向模塊A投擲事件。
模塊A應接收事件並插入到將保存唯一值的熱流中。每250個Milisecons溪流應拉動所有的價值,並在他們身上進行冥想......等等。
例如: 的eventBus拋出事件與數:1,2,3,2,3,2
的流應該得到和保持獨特的價值觀 - > 1,2,3 250之後毫秒該流應打印數字和空值
任何人有一個想法如何開始?我試過這些例子,但沒有任何真正的工作,我想我不明白的東西。任何人都有一個例子?
TNX
編輯:
當試圖做下我總是得到異常:
Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
for (int i = 0; i < 10000; i++) {
p.onNext(i);
}
例外:
java.lang.IllegalStateException: The environment has not been initialized yet
at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]
正如你可以看到我無法繼續嘗試而不通過這個問題。
BY THE WAY:發生這種情況只有當我使用buffer(1, TimeUnit.SECONDS)
如果我使用buffer(50)
例如它的工作..雖然這不是最終的解決方案,它的一個開始。