這是我的實現。我不得不復制send
方法,因爲它是私密的。您可以替換我的時間戳getter實現,因爲它對我的用例非常具體。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import java.util.Arrays;
public abstract class CustomQueryCallback extends QueryCallback {
private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class);
public void receiveStreamEvent(ComplexEventChunk complexEventChunk) {
while (complexEventChunk.hasNext()) {
ComplexEvent streamEvent = complexEventChunk.next();
Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent);
Event[] events = new Event[]{event};
long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]);
if (streamEvent.getType() == StreamEvent.Type.EXPIRED){
send(timestamp, null, events);
} else {
send(timestamp, events, null);
}
}
}
private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) {
try {
receive(timeStamp, currentEvents, expiredEvents);
} catch (RuntimeException e) {
log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e);
}
}
}
理想情況下,我希望能夠重寫send,但這是一種私有方法。 'receiveStreamEvent'是公開的,但有私人領域......有沒有其他可能的鉤子? – Johnny
目前沒有,但我們會將其添加到路線圖中,以便用戶可以禁用批量事件處理。 – suho