唯一的接觸主題是here,但它不能解決我的問題。實木複合地板搭建HDFS getmerge恢復
這裏的問題是,我們收集拼花本地備份與:
$ hadoop fs -getmerge /dir/on/hdfs /local/dir
作出的錯誤是,我們認爲拼花多個文件組織是由於HDFS寫,但我們不明白這是真的實木複合地板文件「正常」組織。所以(不是很聰明)我們使用HDFS的getmerge來做備份。問題是我們的數據已經被刪除,現在我們正在努力恢復它。
當分析(並閱讀doc)實木複合地板時,我們發現所有文件最初由包含數據+元數據的塊組成,其中包含幻數「PAR1」之間的元數據,並添加到此元素中的是2 - _metadata和_common_metadata - 文件的元數據。
通過觀察getmerge處理文件(hdfs上的原始parquet目錄)的順序,我想出了一個腳本,該腳本將2'PAR1'之間的數據作爲塊文件。 構建的前兩個文件是(_common_metadata,_metadata)。
filePrefix='part-'
finalFilePrefix='part-r-'
awk 'NR%2==0{ print $0 > "part-"i++ }' RS='PAR1' $1
nbFiles=$(ls -lah | grep 'part-' | wc -l)
for num in $(seq 0 $nbFiles)
do
fileName="$filePrefix$num"
lastName=""
if [ "$num" -eq "0" ]; then
lastName="_common_metadata"
awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName
else
if [ "$num" -eq "1" ]; then
lastName="_metadata"
awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName
else
if [ -e $fileName ]; then
count=$(printf "%05d" $(($num-2)))
lastName="$finalFilePrefix$count.gz.parquet"
awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName
fi
fi
fi
echo $lastName
truncate --size=-1 $lastName
rm -f "$fileName"
done
mv $1 $1.backup
mkdir $1
mv _* $1
mv part* $1
一些觀察有關腳本:
- 它需要一個「getmerge」實木複合地板文件中的參數
- 創建的移動到原來的文件命名的目錄中的所有部分(後來的幸福重命名文件名。備份)
- 必須在每個文件的末尾採取一個字節 - 截斷 - 這是憑經驗做出的,因爲spark sc.load.parquet()不能讀取元數據文件)否則
- 最終我們使用hadoop fs -put將其上傳到hdfs。
- 嘗試正如我所說的_metadata(和_common_metadate文件顯然)讀取確定將其加載的數據幀 但我們仍然有錯裝載塊時:
代碼:
val newDataDF = sqlContext.read.parquet("/tmp/userActionLog2-leclerc-culturel-2016.09.04.parquet")
newDataDF.take(1)
錯誤:
newDataDF: org.apache.spark.sql.DataFrame = [bson: binary]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 5, hdp-node4.affinytix.com): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13
at org.apache.parquet.format.Util.read(Util.java:216)
at org.apache.parquet.format.Util.readPageHeader(Util.java:65)
at org.apache.parquet.hadoop.ParquetFileReader$WorkaroundChunk.readPageHeader(ParquetFileReader.java:668)
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:546)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:496)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.checkEndOfRowGroup(UnsafeRowParquetRecordReader.java:604)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.loadBatch(UnsafeRowParquetRecordReader.java:218)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextKeyValue(UnsafeRowParquetRecordReader.java:196)
at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: parquet.org.apache.thrift.protocol.TProtocolException: don't know what type: 13
at parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:806)
at parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:500)
at org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:158)
at org.apache.parquet.format.PageHeader.read(PageHeader.java:828)
at org.apache.parquet.format.Util.read(Util.java:213)
... 32 more
鑑於我們的數據是利害攸關的,如果有人有什麼想法,可以幫助,我衷心感謝他(呃)提前。
再見