2017-01-10 99 views
0

唯一的接觸主題是here,但它不能解決我的問題。實木複合地板搭建HDFS getmerge恢復

這裏的問題是,我們收集拼花本地備份與:

$ hadoop fs -getmerge /dir/on/hdfs /local/dir 

作出的錯誤是,我們認爲拼花多個文件組織是由於HDFS寫,但我們不明白這是真的實木複合地板文件「正常」組織。所以(不是很聰明)我們使用HDFS的getmerge來做備份。問題是我們的數據已經被刪除,現在我們正在努力恢復它。

當分析(並閱讀doc)實木複合地板時,我們發現所有文件最初由包含數據+元數據的塊組成,其中包含幻數「PAR1」之間的元數據,並添加到此元素中的是2 - _metadata和_common_metadata - 文件的元數據。

通過觀察getmerge處理文件(hdfs上的原始parquet目錄)的順序,我想出了一個腳本,該腳本將2'PAR1'之間的數據作爲塊文件。 構建的前兩個文件是(_common_metadata,_metadata)。

filePrefix='part-' 
finalFilePrefix='part-r-' 

awk 'NR%2==0{ print $0 > "part-"i++ }' RS='PAR1' $1 

nbFiles=$(ls -lah | grep 'part-' | wc -l) 

for num in $(seq 0 $nbFiles) 
     do 
     fileName="$filePrefix$num" 
     lastName="" 
     if [ "$num" -eq "0" ]; then 
       lastName="_common_metadata" 
       awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
     else  

       if [ "$num" -eq "1" ]; then 
         lastName="_metadata" 
         awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
       else  
         if [ -e $fileName ]; then 
           count=$(printf "%05d" $(($num-2))) 
           lastName="$finalFilePrefix$count.gz.parquet" 
           awk '{print "PAR1" $0 "PAR1"}' $fileName > $lastName 
         fi  
       fi    
     fi 
     echo $lastName 
     truncate --size=-1 $lastName 
     rm -f "$fileName" 
done 

mv $1 $1.backup 
mkdir $1 
mv _* $1 
mv part* $1 

一些觀察有關腳本:

  1. 它需要一個「getmerge」實木複合地板文件中的參數
  2. 創建的移動到原來的文件命名的目錄中的所有部分(後來的幸福重命名文件名。備份)
  3. 必須在每個文件的末尾採取一個字節 - 截斷 - 這是憑經驗做出的,因爲spark sc.load.parquet()不能讀取元數據文件)否則
  4. 最終我們使用hadoop fs -put將其上傳到hdfs。
  5. 嘗試正如我所說的_metadata(和_common_metadate文件顯然)讀取確定將其加載的數據幀 但我們仍然有錯裝載塊時:

代碼:

val newDataDF = sqlContext.read.parquet("/tmp/userActionLog2-leclerc-culturel-2016.09.04.parquet") 
newDataDF.take(1) 

錯誤:

newDataDF: org.apache.spark.sql.DataFrame = [bson: binary] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 5, hdp-node4.affinytix.com): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13 
at org.apache.parquet.format.Util.read(Util.java:216) 
at org.apache.parquet.format.Util.readPageHeader(Util.java:65) 
at org.apache.parquet.hadoop.ParquetFileReader$WorkaroundChunk.readPageHeader(ParquetFileReader.java:668) 
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:546) 
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:496) 
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.checkEndOfRowGroup(UnsafeRowParquetRecordReader.java:604) 
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.loadBatch(UnsafeRowParquetRecordReader.java:218) 
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextKeyValue(UnsafeRowParquetRecordReader.java:196) 
at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: parquet.org.apache.thrift.protocol.TProtocolException: don't know what type: 13 
at parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:806) 
at parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:500) 
at org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:158) 
at org.apache.parquet.format.PageHeader.read(PageHeader.java:828) 
at org.apache.parquet.format.Util.read(Util.java:213) 
... 32 more 

鑑於我們的數據是利害攸關的,如果有人有什麼想法,可以幫助,我衷心感謝他(呃)提前。

再見

回答

0

我已經回答了問題。

我在開始時的基本想法是好的。問題在於awk(在解決方案腳本中)添加了許多字符。 因此,拼花塊在那之後是不可讀的。

解決方法是通過編程(python,perl ...)來操作合併的文件。 這是我提出的python解決方案。它等同於前一個,但不添加無用字符。

代碼:

print "create parquet script." 
import sys 
filename = sys.argv[1] 
import locale 
currencode=locale.getpreferredencoding() 

import io 
print "=====================================================================" 
print "Create parquet from: ", filename 
print "defautl buffer size: ", io.DEFAULT_BUFFER_SIZE 
print "default encoding of the system: ", currencode 
print "=====================================================================" 

import re 
magicnum = "PAR1" 
with io.open(filename, mode='rb') as f: 
     content = f.read() 
res = [ magicnum + chunk + magicnum for chunk in filter(lambda s: s!="", re.split(magicnum, content)) ] 

szcontent = len(res[2:]) 
for i in range(0,szcontent) : 
     si = str(i) 
     write_to_binfile("part-r-{}.gz.parquet".format(si.zfill(5)), res[i+2]) 

write_to_binfile("_common_metadata", res[0]) 
write_to_binfile("_metadata", res[1]) 

import os 
os.system("mv {} {}.backup".format(filename, filename)) 
os.system("mkdir {}".format(filename)) 
os.system("mv _* {}".format(filename)) 
os.system("mv part* {}".format(filename)) 

觀察: 鑲木文件不能是多少大的蟒蛇功能加載在內存中的整個事情作爲一個字符串(幾十兆都OK)! 必須在linux/unix上執行,因爲最後的系統調用是基於unix的。

再見

相關問題