3

我們在Spark中有一個用例,我們想從我們的數據庫中將歷史數據加載到Spark並不斷向Spark添加新的流數據,然後我們可以對整個最新數據集進行分析,最新的數據集。將流數據集附加到Spark中的批處理數據集中

據我所知,Spark SQL和Spark Streaming都不能將歷史數據與流數據結合起來。然後我發現了Spark 2.0中的結構化流式處理,似乎是爲了解決這個問題。但經過一些實驗後,我仍然無法弄清楚。這裏是我的代碼:

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    // Load historical data from MongoDB 
    JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc); 


    // Create typed dataset with customized schema 
    JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...}); 
    Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class); 
    Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds listens to a streaming data source 
    Dataset<Row> ds = spark.readStream() 
      .format("socket") 
      .option("host", "127.0.0.1") 
      .option("port", 11111) 
      .load(); 

    // Create the typed dataset with customized schema 
    Dataset<JavaRecordForSingleTick> ds1 = ds 
      .as(Encoders.STRING()) 
      .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() { 
     @Override 
     public Iterator<JavaRecordForSingleTick> call(String str) throws Exception { 
     ... 
     } 
    }, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data 

    ds1 = ds1.union(df1); 
    StreamingQuery query = ds1.writeStream().format("console").start(); 
    query.awaitTermination(); 

我得到了一個錯誤「org.apache.spark.sql.AnalysisException:流媒體和批量DataFrames之間的聯盟/不支持數據集;」當我union()兩個數據集。

任何人都可以幫我嗎?我會走錯方向嗎?

+0

在Spark 2.0中的結構化流是在Alpha中 - 很多東西還不支持。我想知道你是否不能使用有狀態流。在有狀態流媒體中,您可以使用歷史數據引導您的狀態,然後以您喜歡的方式添加流數據。有關詳細信息,請參閱此[Databrick的博客帖子](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html)。 –

+0

@GlennieHellesSindholt嗨Glennie,謝謝你的建議。我認爲mapWithState()最適合用新流媒體數據替換/更新當前狀態(鍵值對)。在我的使用案例中,我的RDD不是配對的關鍵值,也不需要更新舊數據。使用mapWithState()太多了嗎? –

+0

我同意'mapWithState'不是明顯的選擇,如果你沒有任何聚合,但如果你不需要歷史數據,你爲什麼要在你的流? –

回答

1

在支持這種類型的功能方面,我不能說MongoDB spark連接器,Google似乎沒有多少關於它的信息。但是,Spark數據庫生態系統中還有其他數據庫。我涵蓋了another answer中Spark數據庫生態系統中的大部分內容。我不能確切地說哪個數據庫容易地允許您查找的功能類型,但我知道SnappyDataMemSQL在該列表中。但是,您可能需要兩種關係形式的數據。