0
我想在一個Datastream中的兩個kafka主題之間進行連接。在一個數據流中匹配或加入來自兩個kafka主題的事件和規則
事實上,兩個數據流必須具有相同的ID才能進行連接。 事件是來自傳感器的數據,規則包含將用CEP(來自用戶界面)檢查的規則。
這是我的測試,但它不起作用,任何人都可以幫助我嗎?
DataStream<Object> evtAndRule=inputEventStream.join(rulesStream)
.where(new KeySelector<TrackEvent, Object>() {
@Override
public Object getKey(Event event) throws Exception {
return event.getId();
}
}).equalTo(new KeySelector<RulesEvent, Object>() {
@Override
public Object getKey(RulesEvent rulesEvent) throws Exception {
return rulesEvent.getId();
}
}).window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.apply(new FlatJoinFunction<TrackEvent, RulesEvent, Object>() {
@Override
public void join(TrackEvent trackEvent, RulesEvent rulesEvent, Collector<Object> collector) throws Exception {
....
}
});