2017-10-06 83 views
1

我試圖從Kafka做結構化流式處理。我打算將檢查點存儲在HDFS中。我讀了一個cloudera博客,建議不要在HDFS中爲Spark流存儲檢查點。結構流式檢查點是同樣的問題嗎? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/Kafka結構化流式檢查點

在結構化流媒體中,如果我的火花程序停機一段時間,如何從檢查點目錄獲取最新偏移量,並在該偏移量之後加載數據。 我將檢查點存儲在如下所示的目錄中。

df.writeStream\ 
     .format("text")\ 
     .option("path", '\files') \ 
     .option("checkpointLocation", 'checkpoints\chkpt') \ 
     .start() 

更新:

這是我的結構化數據流的程序讀取卡夫卡消息,解壓縮並寫入到HDFS。

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", KafkaServer) \ 
     .option("subscribe", KafkaTopics) \ 
     .option("failOnDataLoss", "false")\ 
     .load() 
Transaction_DF = df.selectExpr("CAST(value AS STRING)") 
Transaction_DF.printSchema() 

decomp = Transaction_DF.select(zip_extract("value").alias("decompress")) 
#zip_extract is a UDF to decompress the stream 

query = decomp.writeStream\ 
    .format("text")\ 
    .option("path", \Data_directory_inHDFS) \ 
    .option("checkpointLocation", \pathinDHFS\) \ 
    .start() 

query.awaitTermination() 
+0

你確定博客的建議你不要檢查點存儲在HDFS?這很奇怪。你有鏈接嗎?對於結構化流式問題,只需使用相同的檢查點目錄運行相同的代碼,結構化流將拾取最後一個失敗偏移並從中重新啓動。 – zsxwing

+0

@zsxwing這是cloudera博客鏈接https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/我手動殺死了我的流媒體節目分鐘,然後再次啓動它,並在啓動後纔開始處理收到的消息。它忽略了錯過的消息,當它發生故障並且它沒有再處理它們時 –

+0

你可以看看驅動程序日誌,並查找由logInfo(s「GetBatch調用start = $ start,end = $ end」)輸出的日誌嗎?它應該告訴你查詢處理了什麼。 – zsxwing

回答

0

在您的查詢,嘗試將一個檢查點,而寫結果像實木複合地板的一些格式像HDFS一些持久性存儲。它對我有好處。

您可以分享您的代碼,以便我們可以更深入地瞭解一下嗎?

+0

我在問題中添加了完整的代碼作爲更新。你是如何得到檢查點文件的最新偏移量的? –

+0

雖然這樣做,但您如何防止HDFS中的檢查點隨着時間的推移使用越來越多的存儲?有沒有可用於管理的「清理」配置? –

2

在長期存儲(HDFS,AWS S3等)上存儲檢查點是最優選的。我想在此添加一點,即屬性「failOnDataLoss」不應該設置爲false,因爲它不是最佳做法。數據丟失是任何人都不願意承擔的。休息你在正確的道路上。

+0

雖然這樣做,你如何防止HDFS中的檢查點隨着時間的推移使用越來越多的存儲?有沒有可用於管理的「清理」配置? –

+0

據我所知,檢查點不存儲太多的數據,它像Kafka一樣存儲偏移量,所以你不必擔心存儲問題,如果你想清除檢查點,你可以在維護過程中做到這一點,或者你可以把這是一個調度器。 –

+0

我在SparkConf上使用「spark.cleaner.referenceTracking.cleanCheckpoints」,「true」,爲我工作乾淨的檢查點。 –

0

正如我所理解的那樣,它建議在Hbase,Kafka,HDFS或Zookeeper中維護偏移量管理。

「值得一提的是,你還可以存儲偏移在存儲 系統,如HDFS。HDFS中存儲的偏移量是一個不太常用的方法 比上述選項HDFS有着較高的延遲相比 其他像ZooKeeper和HBase這樣的系統。「

您可以在星火文檔找到如何從現有的檢查站重新啓動的查詢:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

相關問題