我是Flink的新手,非常抱歉,如果這個問題很瑣碎!連續分割/選擇
問題描述:。 IoT sensors --> MQTT --> Apache Nifi --> Kafka --> Flink
(僅弗林克是relevante這裏
傳感器發送心跳和其他活動Hearbeats有兩種不同的格式(不同版本),我的目標是統一流
這裏是我的代碼:
SplitStream<GatewayEvent<String>> splitedStream = gatewayStream
.split(new GatewayActionPathSplitter());
// GatewayEvent contains an 'action' (mqqt topic): 'up/hearbeat' action contains v30 and v31 heartbeats
SplitStream<GatewayEvent<String>> heartBeatStream = splitedStream
.select("up/heartbeat")
.split(new HeartBeatVersionSplitter());
// Map heartbeats v31 to HeartBeat objects
DataStream<GatewayEvent<HeartBeat>> hb31Stream = heartBeatStream
.select("V31")
.map(new HeartBeat31Mapper());
// Map heartbeats v30 to HeartBeat objects
DataStream<GatewayEvent<HeartBeat>> hb30Stream = heartBeatStream
.select("V30")
.map(new HeartBeat30Mapper());
DataStream<GatewayEvent<HeartBeat>> allHBStream = hb31Stream
.union(hb30Stream);
allHBStream.print();
我以爲流將有下列事件(「HB30」是HeartBeatv30,「HB31」是HearBeatv31和「O」是其他事件)
gatewayStream = HB30, O, O, HB30, HB31, HB30, O
splitedStream = HB30, O, O, HB30, HB31, HB30, O
splitedStream.select("up/heartbeat") = HB30, HB30, HB31, HB30
heartBeatStream = HB30, HB30, HB31, HB30
heartBeatStream.select("V30") = HB30, HB30, HB30
//錯誤:還包括HB31
heartBeatStream.select("V31") = HB31
//錯誤:包含也HB30s
能有人給我解釋一下:
1 - 有什麼錯這個代碼?
2-是否有另一種解決方案來實現此功能? (我已經有一個解決方法:我的GatewayActionPathSplitter類包含了listenbeat版本的區別)。
感謝您的幫助
謝謝Dawid,我會等待這個問題解決。在此期間,我將使用您的解決方法 –