Spark的結構化流中是否有方法將最終操作添加到DataStreamWriter
的查詢計劃中?我試圖從流數據源讀取數據,以某種方式豐富數據,然後以實木複合格式回寫到分區的外部表(假定爲Hive)。寫操作工作得很好,爲我分配目錄中的數據,但我似乎無法弄清楚在將數據寫入磁盤之後,如何爲可能已創建的任何新分區另外運行MSCK REPAIR TABLE
或ALTER TABLE ADD PARTITION
操作。Spark結構化流式處理運行在外部表上的最終操作(MSCK REPAIR)
爲了簡便起見,採取以下Scala代碼作爲示例:
SparkSession
.builder()
.appName("some name")
.enableHiveSupport()
.getOrCreate()
.readStream
.format("text")
.load("/path/from/somewhere")
// additional transformations
.writeStream
.format("parquet")
.partitionBy("some_column")
.start("/path/to/somewhere")
<-------------------- something I can place here for an additional operation?
.awaitTermination()
潛在的解決方法?:
1:可能使用類似 - 在批次完成後使用ForeachWriter不會導致調用.foreach(new ForeachWriter[Row])
和傳遞FileStreamSink
或類似的東西將工作(使用def close()
運行外部查詢),但我沒有充分研究它,以便很好地掌握使用它。close()
方法。
2:分岔流。沿着以下幾條線:
val stream = SparkSession
.builder()
.appName("some name")
.enableHiveSupport()
.getOrCreate()
.readStream
.format("text")
.load("/path/from/somewhere")
// additional transformations
stream
.writeStream
.format("parquet")
.partitionBy("some_column")
.start("/path/to/somewhere")
.awaitTermination()
stream
.map(getPartitionName).distinct
.map { partition =>
// Run query here
partition
}
.writeStream
.start()
.awaitTermination()
這裏的問題是確保第一個操作在第二個操作之前完成。
3:命名查詢併爲完成的批處理添加一個偵聽器,手動添加所有分區。有點浪費,但可能是可行的?
...
stream
.writeStream
.queryName("SomeName")
...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = Unit
override def onQueryProgress(event: QueryProgressEvent): Unit = {
if (event.progress.name == "SomeName") {
// search through files in filesystem and add partitions
fileSystem.listDir("/path/to/directory").foreach { partition =>
// run "ALTER TABLE ADD PARTITION $partition"
}
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = Unit
})
我在文檔中沒有看到任何內容,希望我沒有錯過任何東西。提前致謝。