2016-12-03 73 views
0

我有兩個事件流:一個發出一個事件來指示一個項目的生命週期的開始,另一個流發出一個事件來指示一個項目的生命週期結束。 (視頻流可上itemId加入。)Flink發射事件當找不到匹配的事件對時

我怎麼能發出在弗林克每個itemId1只有有一個事件「壽命終點」的新事件,並沒有相應的開始? (這些開始和結束事件可能相隔幾小時或幾天。)

+0

在您的信息流中,「生命的開始」事件總是在「生命結束」事件之前始終有保證嗎? 所以,如果你看到一個「EOL」事件,那麼就不會有「事件」事件了。 –

+0

是的,我們可以假定事件總是正確排序 – epb

回答

1

您可以在KeyedStream上使用有狀態FlatMapFunction來實現此功能。

下面的代碼片段應該做你正在尋找的東西。

val stream1: DataStream[Event1] = ??? 
val stream2: DataStream[Event2] = ??? 

// map both streams to their ID and a isStart flag to have a common type 
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true)) 
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false)) 

// union both streams 
val ids = ids1.union(ids2) 

// use a stateful FlatMapFunction to check 
val onlyEOL: DataStream[Int] = ids 
    // organize stream by ID 
    .keyBy(_._1) 
    // use stateful FlatMapFunction to check that bol arrived before eol 
    .flatMapWithState { 
    (value: (Int, Boolean), state: Option[Boolean]) => 
     if (value._2) { 
     // bol event -> emit nothing and set state to true 
     (List(), Some(true)) 
     } else { 
     // eol event 
     if (state.isDefined && state.get) { 
      // bol was seen before -> emit nothing and remove state 
      (List(), None) 
     } else { 
      // bol was NOT seen before -> emit ID and remove state 
      (List(value._1), None) 
     } 
     } 
    }