我們在Spark中有一個用例,我們想從我們的數據庫中將歷史數據加載到Spark並不斷向Spark添加新的流數據,然後我們可以對整個最新數據集進行分析,最新的數據集。將流數據集附加到Spark中的批處理數據集中
據我所知,Spark SQL和Spark Streaming都不能將歷史數據與流數據結合起來。然後我發現了Spark 2.0中的結構化流式處理,似乎是爲了解決這個問題。但經過一些實驗後,我仍然無法弄清楚。這裏是我的代碼:
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);
// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
.format("socket")
.option("host", "127.0.0.1")
.option("port", 11111)
.load();
// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
@Override
public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
...
}
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data
ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();
我得到了一個錯誤「org.apache.spark.sql.AnalysisException:流媒體和批量DataFrames之間的聯盟/不支持數據集;」當我union()兩個數據集。
任何人都可以幫我嗎?我會走錯方向嗎?
在Spark 2.0中的結構化流是在Alpha中 - 很多東西還不支持。我想知道你是否不能使用有狀態流。在有狀態流媒體中,您可以使用歷史數據引導您的狀態,然後以您喜歡的方式添加流數據。有關詳細信息,請參閱此[Databrick的博客帖子](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html)。 –
@GlennieHellesSindholt嗨Glennie,謝謝你的建議。我認爲mapWithState()最適合用新流媒體數據替換/更新當前狀態(鍵值對)。在我的使用案例中,我的RDD不是配對的關鍵值,也不需要更新舊數據。使用mapWithState()太多了嗎? –
我同意'mapWithState'不是明顯的選擇,如果你沒有任何聚合,但如果你不需要歷史數據,你爲什麼要在你的流? –