2017-04-24 31 views
0

我想在一個Datastream中的兩個kafka主題之間進行連接。在一個數據流中匹配或加入來自兩個kafka主題的事件和規則

事實上,兩個數據流必須具有相同的ID才能進行連接。 事件是來自傳感器的數據,規則包含將用CEP(來自用戶界面)檢查的規則。

這是我的測試,但它不起作用,任何人都可以幫助我嗎?

DataStream<Object> evtAndRule=inputEventStream.join(rulesStream) 
      .where(new KeySelector<TrackEvent, Object>() { 
       @Override 
       public Object getKey(Event event) throws Exception { 
        return event.getId(); 
       } 
      }).equalTo(new KeySelector<RulesEvent, Object>() { 
       @Override 
       public Object getKey(RulesEvent rulesEvent) throws Exception { 
        return rulesEvent.getId(); 
       } 
      }).window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS))) 
      .apply(new FlatJoinFunction<TrackEvent, RulesEvent, Object>() { 
       @Override 
       public void join(TrackEvent trackEvent, RulesEvent rulesEvent, Collector<Object> collector) throws Exception { 
      .... 

       } 
      }); 

回答

0

我試過,但我不知道如何檢索所需的規則,如果這是最好的解決辦法

 DataStream<Tuple2<Event , RulesEvent>> evtAndRule= inputEventStream.map(new MapFunction<Event , Tuple2<Event , RulesEvent>>() { 
     @Override 
     public Tuple2<Event , RulesEvent> map(final Event event) throws Exception { 

      return new Tuple2<Event , RulesEvent>(event, new RulesEvent()); 
     } 
    }); 
相關問題