2016-12-05 65 views
1

我創建了一個自定義ParquetOutputFormat(類爲org.apache.parquet.hadoop)來覆蓋getRecordWriter方法。在getRecordWriter方法中,它訪問CodecFactory,這導致IllegalAccessError。爲了嘗試解決這個問題,我嘗試創建自己的類加載器,但這沒有幫助。我跟着這個博客帖子http://techblog.applift.com/upgrading-spark#advanced-case-parquet-writer動態類加載器的IllegalAccessError

我創造我所用的CustomParquetOutputFormat自定義類加載器如下之前:

override def createOutputFormat: OutputFormat[Void, InternalRow] with Ext = new CustomParquetOutputFormat[InternalRow]() with Ext { 
... 
} 

getRecordWriter被稱爲問題發生在CustomParquetOutputFormat試圖上線274訪問CodecFactory

CodecFactory codecFactory = new CodecFactory(conf); 

(這是CustomParquetOutputFormat訪問ParquetOutputFormat的線274)

CodecFactory是包私有的。

自定義類裝載器:

class CustomClassLoader(urls: Array[URL], parent: ClassLoader, whiteList: List[String]) 
    extends ChildFirstURLClassLoader(urls, parent) { 
    override def loadClass(name: String) = { 
    if (whiteList.exists(name.startsWith)) { 
     super.loadClass(name) 
    } else { 
     parent.loadClass(name) 
    } 
    } 
} 

用法:

val sc: SparkContext = SparkContext.getOrCreate() 
val cl: CustomClassLoader = new CustomClassLoader(sc.jars.map(new URL(_)).toArray, 
    Thread.currentThread.getContextClassLoader, List(
    "org.apache.parquet.hadoop.CustomParquetOutputFormat", 
    "org.apache.parquet.hadoop.CodecFactory", 
    "org.apache.parquet.hadoop.ParquetFileWriter", 
    "org.apache.parquet.hadoop.ParquetRecordWriter", 
    "org.apache.parquet.hadoop.InternalParquetRecordWriter", 
    "org.apache.parquet.hadoop.ColumnChunkPageWriteStore", 
    "org.apache.parquet.hadoop.MemoryManager" 
)) 


cl.loadClass("org.apache.parquet.hadoop.CustomParquetOutputFormat") 
    .getConstructor(classOf[String], classOf[TaskAttemptContext]) 
    .newInstance(fullPathWithoutExt, taskAttemptContext) 
    .asInstanceOf[OutputFormat[Void, InternalRow] with ProvidesExtension] 

錯誤:

java.lang.IllegalAccessError: tried to access class org.apache.parquet.hadoop.CodecFactory from class org.apache.parquet.hadoop.customParquetOutputFormat 
     at org.apache.parquet.hadoop.CustomParquetOutputFormat.getRecordWriter(CustomParquetOutputFormat.scala:40) 
     at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
     at org.apache.spark.custom.hadoop.HadoopWriter.<init>(HadoopWriter.scala:35) 
     at org.apache.spark.sql.execution.datasources.parquet.ParquetWriter.<init>(ParquetWriter.scala:16) 
     at org.apache.spark.sql.execution.datasources.parquet.ParquetWriterFactory.createWriter(ParquetWriterFactory.scala:71) 
     at com.abden.custom.index.IndexBuilder$$anonfun$4.apply(IndexBuilder.scala:55) 
     at com.abden.custom.index.IndexBuilder$$anonfun$4.apply(IndexBuilder.scala:54) 
     at scala.collection.immutable.Stream.map(Stream.scala:418) 
     at com.abden.custom.index.IndexBuilder.generateTiles(IndexBuilder.scala:54) 
     at com.abden.custom.index.IndexBuilder.generateLayer(IndexBuilder.scala:155) 
     at com.abden.custom.index.IndexBuilder.appendLayer(IndexBuilder.scala:184) 
     at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1$$anonfun$apply$1.apply(IndexBuilder.scala:213) 
     at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1$$anonfun$apply$1.apply(IndexBuilder.scala:210) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
     at com.abden.custom.util.SplittingByKeyIterator.foreach(SplittingByKeyIterator.scala:3) 
     at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1.apply(IndexBuilder.scala:210) 
     at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1.apply(IndexBuilder.scala:209) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

錯誤發生在此行中getRecordWriter

val codecFactory = new CodecFactory(conf) 

CodecFactory沒有修改,因此僅限於它的包。即使使用動態類加載器來加載來自同一類加載器的所有類,我仍然可以獲得IllegalAccessError

+2

很奇怪的是,錯誤消息顯示'customParquetOutputFormat' (小寫字母c),而其他所有內容都是指'CustomParquetOutputFormat'(大寫字母C)。除此之外,你應該知道'super.loadClass(name)'也會首先檢查父加載器,並且只在本地解析類,如果父類沒有找到它。那麼,不同類加載器加載的類總是被認爲是在不同的(運行時)包中,而不管它們的名字。 – Holger

+0

對不起,修正了錯誤信息。我改變了這個問題的類的名稱,並意外地使用了小寫 – abden003

+0

你好,你可以在自定義類加載器之前分享你的代碼來理解你以前的問題嗎?因爲實現你自己的類加載器似乎是在這裏矯枉過正... – loicmathieu

回答

1

因此,您試圖做的是打破Java的工作方式!你想通過實現你自己的類加載器來訪問一個在其包的外部是私有的類,這個類加載器允許破壞JVM的保護規則(所以你想破壞Java語言規範!)。

我的回答很簡單:不要這樣做!

如果它是私人包裝,則無法訪問它。期!

我認爲最好的辦法是根據你需要的功能來思考,並用當前的API來實現它,而不是試圖強制你的方式。所以,不要問如何做一些技術攻擊,最好的是解釋你想做什麼(爲什麼你想要實現你自己的getRecordWriter方法。

我已經在關於如何閱讀在普通的Java /寫拼花文件這個SOW的問題給出一個答案:Write Parquet format to HDFS using Java API with out using Avro and MR

問候,

盧瓦克