2017-04-10 96 views
1

我是Spark的新手。我正在嘗試將Spark 2.1版本用於CEP目的。 在最近2分鐘內檢測到丟失的事件。我將接收到的輸入轉換爲JavaDSStream的輸入事件,然後在inputEvents上執行reducebykeyandWindow並執行spark sql。Spark RDD vs DataSet性能

JavaPairDStream<String, Long> reduceWindowed = inputEvents.reduceByKeyAndWindow(new MaxTimeFuntion(), 
       Durations.seconds(124), new Duration(2000)); 
reduceWindowed.foreachRDD((rdd, time) -> { 
       SparkSession spark = TestSparkSessionSingleton.getInstance(rdd.context().getConf()); 
       JavaRDD<EventData> rowRDD = rdd.map(new org.apache.spark.api.java.function.Function<Tuple2<String,Long>, EventData>() { 
        @Override 
        public EventData call(Tuple2<String, Long> javaRDD) { 
        { 
          EventData record = new EventData(); 
          record.setId(javaRDD._1); 
          record.setEventTime(javaRDD._2); 
          return record;    
        } 
       }) 
    Dataset<Row> eventDataFrames = spark.createDataFrame(rowRDD, EventData.class); 
    eventDataFrames.createOrReplaceTempView("checkins"); 


Dataset<Row> resultRows=       
        spark.sql("select id, max(eventTime) as maxval, from events group by id having (unix_timestamp()*1000 - maxval >= 120000)"); 

相同的過濾我執行使用RDD功能:

JavaPairDStream<String, Long> filteredStream = reduceWindowed.filter(new Function<Tuple2<String,Long>, Boolean>() { 

     public Boolean call(Tuple2<String,Long> val) 
     { 
      return (System.currentTimeMillis() - val._2() >= 120000); 
     } 
    }); 

    filteredStream.print(); 

無論是方法提供我相同的結果爲數據集& RDD。

我是否正確使用Spark sql?

在本地模式下,對於相同的輸入速率,Spark SQL查詢執行消耗的CPU相對高於RDD函數。誰能幫助我瞭解爲什麼SQL星火相比消耗RDD過濾功能比較高的CPU ..

回答

1

星火SQL使用催化劑(SQL優化器),它的作用:

SQL的
  1. 分析查詢
  2. 使一些邏輯優化
  3. 添加一些物理規劃
  4. 生成一些代碼

d ata在內部設置外部JVM對象的行。可以使用類型安全+快速。比DataFrames慢,不適合交互式分析。 Dataset API作爲API預覽發佈在Spark 1.6中,旨在提供兩全其美;熟悉的面向對象編程風格和編譯時類型安全性的RDD API,但具有Catalyst查詢優化器的性能優勢。數據集也使用與DataFrame API相同的高效堆外存儲機制。

RDD,另一方面,僅僅是一個彈性分佈式數據集是比較數據的黑盒不能作爲可以針對它要執行的操作進行優化的,並不像約束。