我試圖從Kafka做結構化流式處理。我打算將檢查點存儲在HDFS中。我讀了一個cloudera博客,建議不要在HDFS中爲Spark流存儲檢查點。結構流式檢查點是同樣的問題嗎? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/。Kafka結構化流式檢查點
在結構化流媒體中,如果我的火花程序停機一段時間,如何從檢查點目錄獲取最新偏移量,並在該偏移量之後加載數據。 我將檢查點存儲在如下所示的目錄中。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
更新:
這是我的結構化數據流的程序讀取卡夫卡消息,解壓縮並寫入到HDFS。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
你確定博客的建議你不要檢查點存儲在HDFS?這很奇怪。你有鏈接嗎?對於結構化流式問題,只需使用相同的檢查點目錄運行相同的代碼,結構化流將拾取最後一個失敗偏移並從中重新啓動。 – zsxwing
@zsxwing這是cloudera博客鏈接https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/我手動殺死了我的流媒體節目分鐘,然後再次啓動它,並在啓動後纔開始處理收到的消息。它忽略了錯過的消息,當它發生故障並且它沒有再處理它們時 –
你可以看看驅動程序日誌,並查找由logInfo(s「GetBatch調用start = $ start,end = $ end」)輸出的日誌嗎?它應該告訴你查詢處理了什麼。 – zsxwing