2017-08-16 44 views
3

我正在使用Spark 2.1並試圖正常停止Streaming查詢。正常停止結構化流式查詢

StreamingQuery.stop()正常停止,因爲我還沒有看到這種方法的任何詳細信息,可documentation

void stop() 如果正在運行,停止該查詢的執行。 此方法阻塞,直到執行的線程停止。 因爲:2.0.0

而在過去,流世界(DStreams)有一個option to stop流的執行,以確保所有選項接收到的數據已被處理:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 停止流的執行,並選擇確保所有接收到的數據已被處理。

stopSparkContext 如果爲true,則停止關聯的SparkContext。不管這個 StreamingContext是否已經啓動,底層的SparkContext都會停止。

stopGracefully 如果屬實,將等待所有接收到的數據的處理完成

所以現在的問題是如何正常停止結構化查詢流站正常?

回答

0

這要看是什麼意思是 「優雅」 :)

StreamingQuery只停止特定的查詢。它一直等到MicroBatch線程停止並準備關閉源代碼。這個「等待」意味着數據將被處理,然後線程將停止

+0

請問你可以參考一個提到這個的來源嗎?我感到困惑,因爲有一個名爲processAllavaialble()的方法,它可以做同樣的事情..但是文檔說「它只是爲了測試」。 https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable() – shiv455

1

如果「優雅地」表示流式查詢應該完成數據處理,那麼void stop()不會這樣做。它只會等待執行的線程停止(如文檔中所述)。這並不意味着它會完成處理。

爲此,我們需要使查詢等待,直到查詢的當前觸發完成。我們可以通過StreamingQueryStatus,像這樣的檢查:

而(query.status.isTriggerActive){//什麼都不做}

它會等到查詢已完成處理。然後我們可以撥打query.stop()

我希望它有幫助!