2017-06-29 85 views
0

我從兔MQ阿帕奇星火的Sql - 集團通過

{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}} 
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}} 
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}} 
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}} 
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}} 

{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}} 
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}} 
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}} 
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}} 
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}} 

我想獲得持續時間爲每個RunStatus該設備是在下面的JSON數據的到來,因此,對於上面的數據說的設備 - MACH-101,RunStatus看起來像這樣

在Runstatus 1的設備在用於 - 第5秒(30 - 35) 在Runstatus 2所述裝置是用於 - 第5秒(40 - 45) 在Runstatus 3設備所需時間 - 10秒(35 - 40 + 45 - 50)

以上同樣的邏輯也適用於第二設備數據。

下面是我嘗試的Apache Spark SQL查詢,但沒有得到期望的結果。請提出一些替代方案;我不介意以非SQL的方式進行操作。從上面的代碼/ SQL執行

public static void main(String[] args) { 

     try { 

      mconf = new SparkConf(); 
      mconf.setAppName("RabbitMqReceiver"); 
      mconf.setMaster("local[*]"); 

      jssc = new JavaStreamingContext(mconf,Durations.seconds(10)); 

      SparkSession spksess = SparkSession 
        .builder() 
        .master("local[*]") 
        .appName("RabbitMqReceiver2") 
        .getOrCreate(); 

      SQLContext sqlctxt = new SQLContext(spksess); 

      JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2())); 

      JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10)); 

      sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() { 

       @Override public String call(String argdt1,String argdt2) { 

         DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss"); 
         DateTime dt1 = formatter.parseDateTime(argdt1); 
         DateTime dt2 = formatter.parseDateTime(argdt2); 

         Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1); 
         return retsec.toString(); 

       } 
      },DataTypes.StringType); 

      machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() { 

       @Override 
       public void call(JavaRDD<String> rdd) { 
        if(!rdd.isEmpty()){ 

         Dataset<Row> df = sqlctxt.jsonRDD(rdd); 
         df.createOrReplaceTempView("DeviceData"); 

         // I DONT WANT to GROUP by timestamp, but query requires I pass it. 

         Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus," 
           + " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1" 
           + " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp"); 

         searchResult.show(); 

        } 
       } 
      }); 

      jssc.start(); 

      jssc.awaitTermination(); 

     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

樣品結果如下

+--------+---------+--------+ 
 
|DeviceId|runstatus|duration| 
 
+--------+---------+--------+ 
 
| NTC-167|  2| PT0S| 
 
| NTC-168|  2| PT0S| 
 
| NTC-168|  2| PT-10S| 
 
| NTC-168|  2| PT-15S| 
 
| NTC-168|  1| PT10S| 
 
| NTC-168|  1| PT0S| 
 
| NTC-168|  1| PT-5S| 
 
| NTC-168|  1| PT15S| 
 
| NTC-168|  1| PT5S| 
 
| NTC-168|  1| PT0S| 
 
+--------+---------+--------+

所以你可以看到,狀態被重複行其中之一具有重複和出正確的結果。我寫的查詢迫使我通過時間戳進行分組,我想如果我可以避免通過時間戳分組結果可能是正確的...不知道這一點。

+0

1.使用結構化流式傳輸。我還沒有聽說過RabbitMQ格式,但我認爲寫一個不會很難(因爲你有Spark Streaming)。 2.你能否詳細說明「但我沒有得到理想的結果」。你會得到什麼?編輯問題以獲取更多評論。謝謝! –

+0

更新已添加到主要問題 –

回答

1

您可以嘗試使用數據框和窗口功能。使用Window函數中的「lead」,可以將當前行時間戳與下一行時間戳進行比較,並查找每個設備和運行狀態的差異。 像下面一樣,

val windowSpec_wk = Window.partitionBy(df1("DeviceID")).orderBy(df1("timestamp")) 
val df2 = df1.withColumn("period", lead(df1("timestamp"), 1).over(windowSpec_wk)) 
+0

現在所做的是使用上述想法獲得最終的集合,從中我可以運行一個簡單的java函數並獲得結果。它工作正常。 –

相關問題