我是Spark的新手。我正在嘗試將Spark 2.1版本用於CEP目的。 在最近2分鐘內檢測到丟失的事件。我將接收到的輸入轉換爲JavaDSStream的輸入事件,然後在inputEvents上執行reducebykeyandWindow並執行spark sql。Spark RDD vs DataSet性能
JavaPairDStream<String, Long> reduceWindowed = inputEvents.reduceByKeyAndWindow(new MaxTimeFuntion(),
Durations.seconds(124), new Duration(2000));
reduceWindowed.foreachRDD((rdd, time) -> {
SparkSession spark = TestSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaRDD<EventData> rowRDD = rdd.map(new org.apache.spark.api.java.function.Function<Tuple2<String,Long>, EventData>() {
@Override
public EventData call(Tuple2<String, Long> javaRDD) {
{
EventData record = new EventData();
record.setId(javaRDD._1);
record.setEventTime(javaRDD._2);
return record;
}
})
Dataset<Row> eventDataFrames = spark.createDataFrame(rowRDD, EventData.class);
eventDataFrames.createOrReplaceTempView("checkins");
Dataset<Row> resultRows=
spark.sql("select id, max(eventTime) as maxval, from events group by id having (unix_timestamp()*1000 - maxval >= 120000)");
相同的過濾我執行使用RDD功能:
JavaPairDStream<String, Long> filteredStream = reduceWindowed.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String,Long> val)
{
return (System.currentTimeMillis() - val._2() >= 120000);
}
});
filteredStream.print();
無論是方法提供我相同的結果爲數據集& RDD。
我是否正確使用Spark sql?
在本地模式下,對於相同的輸入速率,Spark SQL查詢執行消耗的CPU相對高於RDD函數。誰能幫助我瞭解爲什麼SQL星火相比消耗RDD過濾功能比較高的CPU ..