2014-10-10 69 views
9

我想使用Apache Spark SQL將S3中的json日誌數據也轉換爲S3上的Parquet文件。 我的代碼基本上是:Spark SQL無法完成用大量碎片編寫Parquet數據

import org.apache.spark._ 
val sqlContext = sql.SQLContext(sc) 
val data = sqlContext.jsonFile("s3n://...", 10e-6) 
data.saveAsParquetFile("s3n://...") 

此代碼的工作時,我有多達2000個分區和失敗爲5000以上,無論數據量的。通常,一個可以只合並分區到一個可接受的數目, 但是這是一個非常大的數據集,並在2000個分區我打這個question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s 
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s 
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ... 
java.io.IOException: Could not read footer: java.lang.NullPointerException 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190) 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203) 
     at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) 
     at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77) 
     at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) 
     at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56) 
     at $line37.$read$$iwC$$iwC.<init>(<console>:58) 
     at $line37.$read$$iwC.<init>(<console>:60) 
     at $line37.$read.<init>(<console>:62) 
     at $line37.$read$.<init>(<console>:66) 
     at $line37.$read$.<clinit>(<console>) 
     at $line37.$eval$.<init>(<console>:7) 
     at $line37.$eval$.<clinit>(<console>) 
     at $line37.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
     at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NullPointerException 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106) 
     at java.io.BufferedInputStream.close(BufferedInputStream.java:472) 
     at java.io.FilterInputStream.close(FilterInputStream.java:181) 
     at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

問題描述我在火花1.1.0運行此在ec2中的R3.xlarge上。我正在使用spark-shell控制檯來運行上面的代碼。之後,我可以在SchemaRDD對象上執行非平凡的查詢,所以它似乎不是資源問題。 也可以讀取並查詢生成的Parquet文件,由於缺少摘要文件,它只需要很長的時間。

+1

我會提交關於這個錯誤。 https://issues.apache.org/jira/browse/SPARK/ – 2014-10-31 15:44:49

回答

0

嘗試將此屬性設置爲假:

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");