我從兔MQ阿帕奇星火的Sql - 集團通過
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}}
我想獲得持續時間爲每個RunStatus該設備是在下面的JSON數據的到來,因此,對於上面的數據說的設備 - MACH-101,RunStatus看起來像這樣
在Runstatus 1的設備在用於 - 第5秒(30 - 35) 在Runstatus 2所述裝置是用於 - 第5秒(40 - 45) 在Runstatus 3設備所需時間 - 10秒(35 - 40 + 45 - 50)
以上同樣的邏輯也適用於第二設備數據。
下面是我嘗試的Apache Spark SQL查詢,但沒有得到期望的結果。請提出一些替代方案;我不介意以非SQL的方式進行操作。從上面的代碼/ SQL執行
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local[*]");
jssc = new JavaStreamingContext(mconf,Durations.seconds(10));
SparkSession spksess = SparkSession
.builder()
.master("local[*]")
.appName("RabbitMqReceiver2")
.getOrCreate();
SQLContext sqlctxt = new SQLContext(spksess);
JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10));
sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() {
@Override public String call(String argdt1,String argdt2) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss");
DateTime dt1 = formatter.parseDateTime(argdt1);
DateTime dt2 = formatter.parseDateTime(argdt2);
Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1);
return retsec.toString();
}
},DataTypes.StringType);
machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> df = sqlctxt.jsonRDD(rdd);
df.createOrReplaceTempView("DeviceData");
// I DONT WANT to GROUP by timestamp, but query requires I pass it.
Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus,"
+ " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1"
+ " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp");
searchResult.show();
}
}
});
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
樣品結果如下
+--------+---------+--------+
|DeviceId|runstatus|duration|
+--------+---------+--------+
| NTC-167| 2| PT0S|
| NTC-168| 2| PT0S|
| NTC-168| 2| PT-10S|
| NTC-168| 2| PT-15S|
| NTC-168| 1| PT10S|
| NTC-168| 1| PT0S|
| NTC-168| 1| PT-5S|
| NTC-168| 1| PT15S|
| NTC-168| 1| PT5S|
| NTC-168| 1| PT0S|
+--------+---------+--------+
所以你可以看到,狀態被重複行其中之一具有重複和出正確的結果。我寫的查詢迫使我通過時間戳進行分組,我想如果我可以避免通過時間戳分組結果可能是正確的...不知道這一點。
1.使用結構化流式傳輸。我還沒有聽說過RabbitMQ格式,但我認爲寫一個不會很難(因爲你有Spark Streaming)。 2.你能否詳細說明「但我沒有得到理想的結果」。你會得到什麼?編輯問題以獲取更多評論。謝謝! –
更新已添加到主要問題 –