2017-08-24 109 views
1

Spark的結構化流中是否有方法將最終操作添加到DataStreamWriter的查詢計劃中?我試圖從流數據源讀取數據,以某種方式豐富數據,然後以實木複合格式回寫到分區的外部表(假定爲Hive)。寫操作工作得很好,爲我分配目錄中的數據,但我似乎無法弄清楚在將數據寫入磁盤之後,如何爲可能已創建的任何新分區另外運行MSCK REPAIR TABLEALTER TABLE ADD PARTITION操作。Spark結構化流式處理運行在外部表上的最終操作(MSCK REPAIR)

爲了簡便起見,採取以下Scala代碼作爲示例:

SparkSession 
    .builder() 
    .appName("some name") 
    .enableHiveSupport() 
    .getOrCreate() 
    .readStream 
    .format("text") 
    .load("/path/from/somewhere") 
    // additional transformations 
    .writeStream 
    .format("parquet") 
    .partitionBy("some_column") 
    .start("/path/to/somewhere") 
    <-------------------- something I can place here for an additional operation? 
    .awaitTermination() 

潛在的解決方法?:

1:可能使用類似.foreach(new ForeachWriter[Row])和傳遞FileStreamSink或類似的東西將工作(使用def close()運行外部查詢),但我沒有充分研究它,以便很好地掌握使用它。 - 在批次完成後使用ForeachWriter不會導致調用close()方法。

2:分岔流。沿着以下幾條線:

val stream = SparkSession 
    .builder() 
    .appName("some name") 
    .enableHiveSupport() 
    .getOrCreate() 
    .readStream 
    .format("text") 
    .load("/path/from/somewhere") 
    // additional transformations 

stream 
    .writeStream 
    .format("parquet") 
    .partitionBy("some_column") 
    .start("/path/to/somewhere") 
    .awaitTermination() 

stream 
    .map(getPartitionName).distinct 
    .map { partition => 

    // Run query here 

    partition 
    } 
    .writeStream 
    .start() 
    .awaitTermination() 

這裏的問題是確保第一個操作在第二個操作之前完成。

3:命名查詢併爲完成的批處理添加一個偵聽器,手動添加所有分區。有點浪費,但可能是可行的?

... 
stream 
    .writeStream 
    .queryName("SomeName") 
... 


spark.streams.addListener(new StreamingQueryListener() { 

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = Unit 
    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    if (event.progress.name == "SomeName") { 
     // search through files in filesystem and add partitions 

     fileSystem.listDir("/path/to/directory").foreach { partition => 
     // run "ALTER TABLE ADD PARTITION $partition" 
     } 
    } 
    } 
    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = Unit 
}) 

我在文檔中沒有看到任何內容,希望我沒有錯過任何東西。提前致謝。

回答

0

使用StreamingQueryListener的作品,雖然我不知道這是好還是壞的做法。

我實現了東西沿着這行:

spark.streams.addListener(new StreamingQueryListener() { 

    val client = new Client() 

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = Unit 
    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = Unit 
    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    if (event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains("/path/to/write/directory")) { 
     client.sql(s"MSCK REPAIR TABLE $db.$table") 
    } 
    } 
}) 

如果你碰巧有基於時間的分區,這個工程只要正派,你打算基於now()創建分區:

spark.streams.addListener(new StreamingQueryListener() { 

    val client = new Client() 
    var lastPartition: String = "" 
    val dateTimeFormat: String = "yyyy-MM-dd" 

    override def onQueryStarted... 
    override onQueryTerminated... 
    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    if (event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink[s3") && event.progress.sink.description.contains("/path/to/write/directory")) { 

     val newPartition = new DateTime().toString(dateTimeFormat) 

     if (newPartition != lastPartition) { 
     client.sql(s"ALTER TABLE $db.$table ADD IF NOT EXISTS PARTITION ($partitionColumn='$newPartition')") 
     lastPartition = newPartition 
    } 
    } 
} 
相關問題