2017-07-28 156 views
0

我是新來的火花流。我正在嘗試使用本地csv文件進行結構化的火花流式傳輸。我在處理時遇到以下異常。爲什麼我的查詢在使用AnalysisException時失敗?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[file:///home/Teju/Desktop/SparkInputFiles/*.csv] 

這是我的代碼。

val df = spark 
    .readStream 
    .format("csv") 
    .option("header", "false") // Use first line of all files as header 
    .option("delimiter", ":") // Specifying the delimiter of the input file 
    .schema(inputdata_schema) // Specifying the schema for the input file 
    .load("file:///home/Teju/Desktop/SparkInputFiles/*.csv") 

val filterop = spark.sql("select tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID,first(rssi_weightage(RSSI)) as RSSI_Weight from my_table where RSSI > -127 group by tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID order by Timestamp ASC") 
val outStream = filterop.writeStream.outputMode("complete").format("console").start() 

我創建cron作業所以每次5分鐘,我會得到一個輸入CSV file.I上午試圖通過火花streaming.Any幫助解析將不勝感激。

+1

'df'和其他數據集'filterop'和'outStream'之間有什麼關係?你不要在粘貼的代碼中使用'df'。這是故意的嗎?我會說代碼不能按原樣執行。有一些重要的缺失。 –

回答

-1

.writeStream.start添加到您的df,因爲例外是告訴你。

閱讀docs瞭解更多詳情。

+0

在粘貼的代碼中應該添加'.writeStream.start'?我不明白它是如何解決這個問題的。謹慎闡述? –

+0

@JacekLaskowski數據幀'''df'''永遠不會顯式地使用''writeStream.start'''ed。我懷疑查詢必須是「完整的」,而沒有關閉循環的隱式/帶外查詢。我懷疑csv的流加載從未被觸發,因爲輸入和輸出之間沒有鏈接。我懷疑用''''df.select'''替換''spark.sql'''解決了這個問題。 –

0

(這不是一個解決方案,但更多的是一個評論,但考慮到它的長度,它最終會在這裏結束,我會在收集足夠的信息以供調查後最終做出答案)。


猜測是你做的,你有沒有包含在你的問題上df不正確的數據。

由於錯誤信息是關於FileSource,路徑如下所示,它是一個流數據集,必須是df正在發揮作用。

FileSource [文件:///home/Teju/Desktop/SparkInputFiles/*.csv]

考慮到其他行我您註冊流數據集作爲一個臨時表(即my_table),然後您在spark.sql中使用SQL和writeStream到控制檯。

df.createOrReplaceTempView("my_table") 

如果這是正確的,你已經包含在問題的代碼是不完整,不顯示對錯誤的原因。