我試圖從IntelliJ的想法運行下面的代碼,從卡夫卡打印消息到控制檯。但它引發以下錯誤 - 在線程Spark結構化流 -
異常「主要」 org.apache.spark.sql.AnalysisException:查詢與流媒體源必須與writeStream.start(執行);; kafka
Stacktrace從Dataset.checkpoint開始並向上調整。如果我刪除.checkPoint(),那麼我得到一些其他錯誤 - 與權限相關 17/08/02 12:10:52錯誤StreamMetadata:寫入流元數據StreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)entry in command string:null chmod 0644 C:\ Users \ rp \應用程序數據\本地的\ Temp \臨時2f570b97-ad16-4f00-8356-d43ccb7660db \元
def main(args : Array[String]) = {
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate()
val canonicalSchema = new StructType()
.add("cid",StringType)
.add("uid",StringType)
.add("sourceSystem",
new StructType().add("id",StringType)
.add("name",StringType))
.add("name", new StructType()
.add("firstname",StringType)
.add("lastname",StringType))
val messages = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","c_canonical")
.option("startingOffset","earliest")
.load()
.checkpoint()
.select(from_json(col("value").cast("string"),canonicalSchema))
.writeStream.outputMode("append").format("console").start.awaitTermination
}
誰能請幫助我瞭解,我做錯了嗎?
嘗試以管理員身份運行IntelliJ。 –
謝謝你的回覆,但沒有奏效。 –