2015-07-21 80 views
0

我正在測試一個非常簡單的CEP查詢與外部定時窗口。查詢是define stream LoginEvents (timeStamp long, ip string, phone string); @info(name = 'query1') from LoginEvents#window.externalTime(timeStamp,5 sec) select timeStamp, ip insert all events into uniqueIps;;爲什麼所有事件都是散裝的而不是一個一個的?

看看單元測試here,我想會發生什麼是回調將被調用9次,5次傳入事件和4次過期。相反,它只被調用一次。爲什麼會這樣?我怎樣才能達到每個事件都會調用回調的狀態?

回答

1

這是我的實現。我不得不復制send方法,因爲它是私密的。您可以替換我的時間戳getter實現,因爲它對我的用例非常具體。

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.wso2.siddhi.core.event.ComplexEvent; 
import org.wso2.siddhi.core.event.ComplexEventChunk; 
import org.wso2.siddhi.core.event.Event; 
import org.wso2.siddhi.core.event.stream.StreamEvent; 
import org.wso2.siddhi.core.query.output.callback.QueryCallback; 
import java.util.Arrays; 

public abstract class CustomQueryCallback extends QueryCallback { 

    private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class); 

    public void receiveStreamEvent(ComplexEventChunk complexEventChunk) { 
     while (complexEventChunk.hasNext()) { 
      ComplexEvent streamEvent = complexEventChunk.next(); 
      Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent); 
      Event[] events = new Event[]{event}; 
      long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]); 
      if (streamEvent.getType() == StreamEvent.Type.EXPIRED){ 
       send(timestamp, null, events); 
      } else { 
       send(timestamp, events, null); 
      } 
     } 
    } 

    private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) { 
     try { 
      receive(timeStamp, currentEvents, expiredEvents); 
     } catch (RuntimeException e) { 
      log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e); 
     } 
    } 

} 
1

這裏所有的事件都是沒有任何時間延遲地發送給Siddhi的,因此Siddhi一起處理所有這些事件。這就是爲什麼這些事件是以散裝形式返回的。

如果您希望爲每個事件調用回調,那麼您必須擴展StreamCallback或QueryCallback,並迭代返回的事件數組並調用每個事件的回調。

+0

理想情況下,我希望能夠重寫send,但這是一種私有方法。 'receiveStreamEvent'是公開的,但有私人領域......有沒有其他可能的鉤子? – Johnny

+0

目前沒有,但我們會將其添加到路線圖中,以便用戶可以禁用批量事件處理。 – suho

相關問題