我有一個Spark任務,我需要在每個微批處理中寫入SQL查詢的輸出。寫入操作是一項昂貴的操作,導致批處理執行時間超過批處理間隔。如何在Spark Streaming應用程序中異步寫入行以加快批處理執行速度?
我正在尋找提高寫入性能的方法。
正在單獨的線程中執行寫操作,如下面的一個好選項所示?
這是否會導致任何副作用,因爲Spark本身以分佈式方式執行?
是否有其他更好的方法來加速寫入?
// Create a fixed thread pool to execute asynchronous tasks val executorService = Executors.newFixedThreadPool(2) dstream.foreachRDD { rdd => import org.apache.spark.sql._ val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate import spark.implicits._ import spark.sql val records = rdd.toDF("record") records.createOrReplaceTempView("records") val result = spark.sql("select * from records") // Submit a asynchronous task to write executorService.submit { new Runnable { override def run(): Unit = { result.write.parquet(output) } } } }
感謝您的詳細解釋!這絕對有幫助。將嘗試改進性能的替代選項。 – vijay
@vijay你的問題是回答?考慮接受答案關閉它。 – maasg