0
我有兩個事件流:一個發出一個事件來指示一個項目的生命週期的開始,另一個流發出一個事件來指示一個項目的生命週期結束。 (視頻流可上itemId
加入。)Flink發射事件當找不到匹配的事件對時
我怎麼能發出在弗林克每個itemId1
該只有有一個事件「壽命終點」的新事件,並沒有相應的開始? (這些開始和結束事件可能相隔幾小時或幾天。)
我有兩個事件流:一個發出一個事件來指示一個項目的生命週期的開始,另一個流發出一個事件來指示一個項目的生命週期結束。 (視頻流可上itemId
加入。)Flink發射事件當找不到匹配的事件對時
我怎麼能發出在弗林克每個itemId1
該只有有一個事件「壽命終點」的新事件,並沒有相應的開始? (這些開始和結束事件可能相隔幾小時或幾天。)
您可以在KeyedStream
上使用有狀態FlatMapFunction
來實現此功能。
下面的代碼片段應該做你正在尋找的東西。
val stream1: DataStream[Event1] = ???
val stream2: DataStream[Event2] = ???
// map both streams to their ID and a isStart flag to have a common type
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true))
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false))
// union both streams
val ids = ids1.union(ids2)
// use a stateful FlatMapFunction to check
val onlyEOL: DataStream[Int] = ids
// organize stream by ID
.keyBy(_._1)
// use stateful FlatMapFunction to check that bol arrived before eol
.flatMapWithState {
(value: (Int, Boolean), state: Option[Boolean]) =>
if (value._2) {
// bol event -> emit nothing and set state to true
(List(), Some(true))
} else {
// eol event
if (state.isDefined && state.get) {
// bol was seen before -> emit nothing and remove state
(List(), None)
} else {
// bol was NOT seen before -> emit ID and remove state
(List(value._1), None)
}
}
}
在您的信息流中,「生命的開始」事件總是在「生命結束」事件之前始終有保證嗎? 所以,如果你看到一個「EOL」事件,那麼就不會有「事件」事件了。 –
是的,我們可以假定事件總是正確排序 – epb