我是新來的火花流。我正在嘗試使用本地csv文件進行結構化的火花流式傳輸。我在處理時遇到以下異常。爲什麼我的查詢在使用AnalysisException時失敗?
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/Teju/Desktop/SparkInputFiles/*.csv]
這是我的代碼。
val df = spark
.readStream
.format("csv")
.option("header", "false") // Use first line of all files as header
.option("delimiter", ":") // Specifying the delimiter of the input file
.schema(inputdata_schema) // Specifying the schema for the input file
.load("file:///home/Teju/Desktop/SparkInputFiles/*.csv")
val filterop = spark.sql("select tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID,first(rssi_weightage(RSSI)) as RSSI_Weight from my_table where RSSI > -127 group by tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID order by Timestamp ASC")
val outStream = filterop.writeStream.outputMode("complete").format("console").start()
我創建cron作業所以每次5分鐘,我會得到一個輸入CSV file.I上午試圖通過火花streaming.Any幫助解析將不勝感激。
'df'和其他數據集'filterop'和'outStream'之間有什麼關係?你不要在粘貼的代碼中使用'df'。這是故意的嗎?我會說代碼不能按原樣執行。有一些重要的缺失。 –