2017-04-11 94 views
0

我是Flink的新手,非常抱歉,如果這個問題很瑣碎!連續分割/選擇

問題描述:。 IoT sensors --> MQTT --> Apache Nifi --> Kafka --> Flink(僅弗林克是relevante這裏

傳感器發送心跳和其他活動Hearbeats有兩種不同的格式(不同版本),我的目標是統一流

這裏是我的代碼:

SplitStream<GatewayEvent<String>> splitedStream = gatewayStream 
      .split(new GatewayActionPathSplitter()); 

// GatewayEvent contains an 'action' (mqqt topic): 'up/hearbeat' action contains v30 and v31 heartbeats 
SplitStream<GatewayEvent<String>> heartBeatStream = splitedStream 
      .select("up/heartbeat") 
      .split(new HeartBeatVersionSplitter()); 

// Map heartbeats v31 to HeartBeat objects 
DataStream<GatewayEvent<HeartBeat>> hb31Stream = heartBeatStream 
     .select("V31") 
     .map(new HeartBeat31Mapper()); 

// Map heartbeats v30 to HeartBeat objects 
DataStream<GatewayEvent<HeartBeat>> hb30Stream = heartBeatStream 
     .select("V30") 
     .map(new HeartBeat30Mapper()); 

DataStream<GatewayEvent<HeartBeat>> allHBStream = hb31Stream 
      .union(hb30Stream); 

allHBStream.print(); 

我以爲流將有下列事件(「HB30」是HeartBeatv30,「HB31」是HearBeatv31和「O」是其他事件)

gatewayStream = HB30, O, O, HB30, HB31, HB30, O
splitedStream = HB30, O, O, HB30, HB31, HB30, O
splitedStream.select("up/heartbeat") = HB30, HB30, HB31, HB30
heartBeatStream = HB30, HB30, HB31, HB30
heartBeatStream.select("V30") = HB30, HB30, HB30 //錯誤:還包括HB31
heartBeatStream.select("V31") = HB31 //錯誤:包含也HB30s

能有人給我解釋一下:
1 - 有什麼錯這個代碼?
2-是否有另一種解決方案來實現此功能? (我已經有一個解決方法:我的GatewayActionPathSplitter類包含了listenbeat版本的區別)。

感謝您的幫助

回答

1

不幸的是split操作不是可堆疊的,因爲他們只是採用策略路由以前的操作符的輸出。這個問題已經打開JIRA

另一種選擇是將區分邏輯合併到您的map函數中。有點像:

.filter(event.getType.equals("HB31") || event.getType.equals("HB30"))) 
.map(event => 
    event.getType match { 
    case "HB31" => new HeartBeat31Mapper() 
    case "HB30" => new HeartBeat30Mapper() 
    }) 
+0

謝謝Dawid,我會等待這個問題解決。在此期間,我將使用您的解決方法 –