1

我試圖從IntelliJ的想法運行下面的代碼,從卡夫卡打印消息到控制檯。但它引發以下錯誤 - 在線程Spark結構化流 -

異常「主要」 org.apache.spark.sql.AnalysisException:查詢與流媒體源必須與writeStream.start(執行);; kafka

Stacktrace從Dataset.checkpoint開始並向上調整。如果我刪除.checkPoint(),那麼我得到一些其他錯誤 - 與權限相關 17/08/02 12:10:52錯誤StreamMetadata:寫入流元數據StreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)entry in command string:null chmod 0644 C:\ Users \ rp \應用程序數據\本地的\ Temp \臨時2f570b97-ad16-4f00-8356-d43ccb7660db \元

def main(args : Array[String]) = { 
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate() 
    val canonicalSchema = new StructType() 
          .add("cid",StringType) 
          .add("uid",StringType) 
          .add("sourceSystem", 
           new StructType().add("id",StringType) 
               .add("name",StringType)) 
          .add("name", new StructType() 
             .add("firstname",StringType) 
             .add("lastname",StringType)) 


    val messages = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribe","c_canonical") 
        .option("startingOffset","earliest") 
        .load() 
        .checkpoint() 
.select(from_json(col("value").cast("string"),canonicalSchema)) 
.writeStream.outputMode("append").format("console").start.awaitTermination 

} 

誰能請幫助我瞭解,我做錯了嗎?

+0

嘗試以管理員身份運行IntelliJ。 –

+0

謝謝你的回覆,但沒有奏效。 –

回答

1
  1. 結構化流式傳輸不支持Dataset.checkpoint()。有一張開放票可以提供更好的消息或者忽略它:https://issues.apache.org/jira/browse/SPARK-20927

  2. IOException可能是因爲您沒有在Windows上安裝cygwin。

+0

儘管結構化流不支持'checkpoint()',它支持'option(「checkpointLocation」,「/ path/to/store」)'。你會說這個叫什麼? –

+0

更新了答案。不幸的是,他們使用相同的詞,但他們是完全不同的東西。 – zsxwing

+0

有沒有我能找到'checkpointLocation'的實際含義以及它有何不同? –

相關問題