2016-09-22 83 views
0

任何迴應,我有這樣一個簡單的查詢:西提我沒有從彙總查詢

define stream myEventStream (userId string, price int); 
define stream outputStream (avgPrice double, userId string); 

@info(name = 'aQuery') 
from myEventStream#window.time(5000) 
select avg(price) as avgPrice, userId group by userId 
insert into outputStream; 

當我添加查詢回調,我不從它那裏得到任何迴應。

runtime.addCallback("aQuery", new QueryCallback() { 
     @Override 
     public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { 
      EventPrinter.print(timeStamp, inEvents, removeEvents); 
     } 
    }); 

我在另一個線程產生消息:

final AtomicInteger counter = new AtomicInteger(0); 
    final Random rnd = new Random(System.currentTimeMillis()); 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    executor.submit(() -> { 
     while (counter.getAndIncrement() < 100) { 
      try { 
       handler.send(new Object[]{"user1", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user2", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user3", rnd.nextInt(100)}); 
       System.out.println("Sent: " + counter.get()); 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new RuntimeException(e); 
      } 
     } 
    }); 

我期待導致每5秒。我在這裏錯過了什麼?請幫忙。 謝謝。

回答

0

那麼問題是,我已經關閉了運行時後,產生的事件流的代碼。儘管如此,我仍然可以向運行時發送消息。沒有例外。