2016-08-17 106 views
3

我們試圖提交一個spark工作(spark 2.0,hadoop 2.7.2),但由於某種原因,我們在EMR中收到了一個相當神祕的NPE。作爲一個scala程序,一切都運行得很好,所以我們不確定是什麼原因造成了這個問題。這裏的堆棧跟蹤:作爲spark工作提交時,Spark RDD映射中的NullPointerException

18:02:55271 ERROR utils的:91 - 中止任務 顯示java.lang.NullPointerException 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $(來源不明) 在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(來源不明) 在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252) at org.apache。 spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution .datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasource s .InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor。 runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

據我們所知,這種情況發生在以下方法中:

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.map(row => 
     "text|label" 
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

我們已經收窄,到地圖的功能,因爲這時候的火花作業提交作品:

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

沒有人有任何想法可能會造成這個問題?另外,我們如何解決它?我們很難過。

+0

你沒試過'coalesce()'嗎? – gsamaras

+0

@gsamaras不!但它似乎沒有合併就行。這裏發生了什麼? – cscan

回答

5

我想你會得到一個NullPointerException當工作人員試圖訪問只存在於驅動程序而不是工作者的SparkContext對象時拋出的。

coalesce()重新分區您的數據。當你只請求一個分區時,它會試圖擠壓全部數據在一個分區*。這可能會給應用程序的內存足夠大帶來壓力。

一般來說,最好不要只將分區縮小到1。

欲瞭解更多,請閱讀:Spark NullPointerException with saveAsTextFilethis


+0

我們使用coalesce(1)的原因是將所有數據寫入單個文件而不是多個文件。有沒有其他方法可以實現這一點? – cscan

+0

@cscan no。也許增加你的內存設置可以讓你的應用程序使用1分區,但是我發佈的錯誤並不表示這樣的事情。爲什麼你希望他們在1個文件中有一個原因嗎? – gsamaras

+1

當我們只用五條記錄進行測試時發生了這個錯誤 - 我不認爲它與內存使用有關。 – cscan