2016-05-23 51 views
1

我有研究和閱讀文件,他們不是很容易理解。 我想實現的是以下功能:彈簧反應堆項目的反應流背壓

我正在使用Spring Reactor項目並使用eventBus。我的事件總線正在向模塊A投擲事件。

模塊A應接收事件並插入到將保存唯一值的熱流中。每250個Milisecons溪流應拉動所有的價值,並在他們身上進行冥想......等等。

例如: 的eventBus拋出事件與數:1,2,3,2,3,2

的流應該得到和保持獨特的價值觀 - > 1,2,3 250之後毫秒該流應打印數字和空值

任何人有一個想法如何開始?我試過這些例子,但沒有任何真正的工作,我想我不明白的東西。任何人都有一個例子?

TNX

編輯:

當試圖做下我總是得到異常:

 Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS); 

     s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); 

     for (int i = 0; i < 10000; i++) { 
      p.onNext(i); 
     } 

例外:

java.lang.IllegalStateException: The environment has not been initialized yet 
    at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?] 

正如你可以看到我無法繼續嘗試而不通過這個問題。

BY THE WAY:發生這種情況只有當我使用buffer(1, TimeUnit.SECONDS)如果我使用buffer(50)例如它的工作..雖然這不是最終的解決方案,它的一個開始。

回答

0

再次閱讀文檔後,嗯,我錯過了這一點:

static { 
     Environment.initialize(); 
    } 

這解決了這個問題。 Tnx