2017-07-18 70 views
0

例外:任務不 序列化在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util .ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)at org.apache.spark。 SparkContext.runJob(SparkContext.scala:1929)at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:927)at org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.collect(RDD.scala:926)在 org.exadatum.ddq.constraints.DateFormatConstraint $$ anonfun $ 2.適用(DateFormatConstraint.scala:32) 在 org.exadatum。 ddq.constraints.DateFormatConstraint $$ anonfun $ 2.apply(DateFormatConstraint.scala:16) at org.exadatum.ddq.core.Runner $$ anonfun $ run $ 1 $$ anonfun $ 3.ap簾布層(Runner.scala:22) 在 org.exadatum.ddq.core.Runner $$ anonfun $ $運行1 $$ anonfun $ 3.apply(Runner.scala:22) 在 scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala :318)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner $$ anonfun $ run $ 1.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner $$ anonfun $ run $ 1.apply(Runner.scala:20) 在 scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244) at scala.collection .immutable.List.foreach(List.scala:318)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org org.exadatum.ddq.core.Runner。.run(Runner.scala:20)at org.exadatum.ddq.core.RunCheck。(RunCheck.scala:104)at org.exadatum.ddq.core.DQJobTrigger $。 main(DQJobTrigger.scala:39)at org.exadatum.ddq.core.DQJobTrigger.main(DQJobTrigger.scala)at sun.reflect.NativeMethodAccessorI mpl.invoke0(本機方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method中。調用(Method.java:498) org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy .SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit .scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit。scala)引起 作者:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化堆棧: - 對象不可序列化(class:org.apache.spark.SparkContext,value:[email protected] ) - 字段(類org.exadatum.ddq.constraints.DateFormatConstraint,名稱:sc,類型:org.apache.spark.SparkContext類) - 對象(類org.exadatum.ddq.constraints.DateFormatConstraint,DateFormatConstraint(startdate) ,java.text.SimpleDateFormat @ 4f76f1a0,org.apache.spark.SparkContext @ 1d9bd4d6,xdqdemo.customer_details)) - field(class:org.exadatum.ddq.constraints.DateFormatConstraint $$ anonfun $ 2,name: $ outer,請鍵入:class org.exadatum.ddq.constraints.DateFormatConstraint) - object(class org.exadatum.ddq.constraints.Date FormatConstraint $$ anonfun $ 2, ) - 場(等級:org.exadatum.ddq.constraints.DateFormatConstraint $$ anonfun $ $$ 2 $ anonfun 3 名:$外,類型:類 org.exadatum.ddq.constraints。 DateFormatConstraint $$ anonfun $ 2) - object(class org.exadatum.ddq.constraints.DateFormatConstraint $$ anonfun $ 2 $$ anonfun $ 3, ) - field(class:org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2,name: func $ 2,類型:interface scala.Function1) - object(class org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2, ) - field(class:org .apache.spark.sql.catalyst.expressions.ScalaUDF,名稱:f,類型:interface scala.Function1) - object(class org.apache.spark.sql.catalyst.expressions.ScalaUDF,UDF(startdate#2)) - writeObject data(class:scala.collection.immutable。$ colon $ colon) - object(class scala .collection.immutable。$ colon $ colon,List(UDF(​​startdate#2))) - field(class:org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, name:predicates,type:interface scala。 InMemoryRelation [0,name#1,startdate#2], [UDF(startdate#2)],InMemoryRelation(對象(類org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, InMemoryColumnarTableScan [phone_number#0,name#1,startdate#2], ) [phone_number#0,name#1,startdate#2],true,10000,StorageLevel(false, true,false,true,1),ConvertToUnsafe,None) - field(class:org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan $$ anonfun $ doExecute $ 1, name:$ outer,type:class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan)對象(類org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan $$ anonfun $ doExecute $ 1, ) - field(class:org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1,name :f $ 22, 類型:interface scala.Function1) - object(class org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1, ) - field(class:org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 21, name:$ outer,type:class org.apache.spark.rdd .RDD $$ anonfun $ mapPartitionsInternal $ 1) - object(class org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 21, ) - field(class:org.apache.spark。 rdd.MapPartitionsRDD,name:f,type:interface scala.Function3) - 對象(類org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD [8]在rdd的DateFormatConstraint.scala:32) - field(class:org .apache.spark.NarrowDependency,名稱:rdd,類型:class org.apache.spark.rdd.RDD) - 對象(類org.apache.spark.OneToOneDependency,[email protected]) - writeObject data(class:scala.collection.immutable。$ colon $ colon) - object(class scala.collection.immutable。$ colon $ colon,List(org.apache.spark。OneToOneDependency @ 316975be)) - field(class:org.apache.spark.rdd.RDD,name:org $ apache $ spark $ rdd $ RDD $$ dependencies,type:interface scala.collection.Seq) - object (類org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD [9]在rdd在DateFormatConstraint.scala:32) - 字段(類:org.apache.spark.NarrowDependency,名稱:rdd,類型:類org.apache寫入數據(類:scala.collection.immutable。$冒號$冒號) - object(class scala.collection.immutable。$ colon $ colon,List([email protected])) - field(class:org.apache.sp ark.rdd.RDD,名稱:org $ apache $ spark $ rdd $ RDD $$依賴關係,類型:接口 scala.collection.Seq) - 對象(類org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD [10 ]在rdd的DateFormatConstraint.scala:32) - field(class:org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1,name:$ outer,type:class org.apache.spark.rdd.RDD) (類org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1) - field(class:org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1 $$ anonfun $ 12,name: $ outer, 類型:class org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1) - object(class org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1 $$ anonfun $ 12,) 在 org.apache.spark.serializer.SerializationDebugger $ .improveExce ption(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301) ...... 39多個Spark是不能序列化的任務,使用UDF過濾器線程「main」 org.apache.spark.SparkException

代碼片段:

val fun = (df: DataFrame) => { 

format.setLenient(false) 
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure) 
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count); 

/** Utility to persist all of the bad records **/ 

val hiveContext = new HiveContext(sc) 
import hiveContext.implicits._ 

//Writing all Bad records 
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName))) 
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName))) 

var recordLists = new ListBuffer[List[(String, String, String)]]() 
writeToHiveDf.rdd.collect().foreach { 
    row => 
    val item = row.mkString("-") 
    val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item)) 
     .map { case List(a, b, c) => (a, b, c) } 
    recordLists += recordList 
} 
val listRDD = sc.parallelize(recordLists.flatten) 
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data") 
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records") 



DateFormatConstraintResult(
    this, 
    data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData), 
    status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0) 
) 

}

+0

我猜這裏面的'format.parse'所導致的問題。你沒有在代碼片段中包含這個'format'變量的初始化,但是'format'類本身不是可序列化的,或者你正在初始化'format'的類是不可序列化的(最有可能是後者) –

+0

format是一個參數,它被初始化爲format = SimpleDateFormat(「some-date-format」); –

+0

感謝您的回覆 –

回答

0
object checkConstraint extends Serializable{ 
    def checkDateFormat(format: SimpleDateFormat,df: DataFrame): DataFrame = { 
    format.setLenient(false) 
    val checkDateFormat = (column: String) => Try(format.parse(column)).isFailure 
    val cannotBeDate = udf((column: String) => column != null && checkDateFormat(column)) 
    df.filter(cannotBeDate(new Column(columnName))) 
    } 
} 


val writeToHiveDf = checkConstraint.checkDateFormat(format,df) 

因此所有的計算都擠滿一個單獨的對象,它返回一個數據幀所需

相關問題