0
當傳遞給Spark RDD操作的lambda引用其範圍之外的對象時,它將包含創建分佈式執行的序列化任務所需的上下文。在下面的簡單例子中,爲什麼它決定序列化整個OutClass實例,而不是乘法器?我懷疑乘數實際上是引擎蓋下的一個Scala吸氣器方法,因此它必須包含該類的參考。聲明OuterClass extends Serializable是可行的,但它會引入不必要的約束。我真的很感激一種方法來使它在沒有聲明OuterClass可序列化的情況下工作。Spark任務序列化和關閉
object ClosureTest {
def main(args: Array[String]): Unit = {
val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[2]").setAppName("test"))
println(new OuterClass(10).sparkSumProd(sc.parallelize(Seq(1,2,3))))
}
class OuterClass(multiplier: Int) {
def sparkSumProd(data: RDD[Int]): Double = {
data.map{
v => v * multiplier
}.sum()
}
}
}
下面是星火的SerializationDebugger
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at ClosureTest$OuterClass.sparkSumProd(ClosureTest.scala:14)
at ClosureTest$.main(ClosureTest.scala:10)
at ClosureTest.main(ClosureTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: ClosureTest$OuterClass
Serialization stack:
- object not serializable (class: ClosureTest$OuterClass, value: [email protected])
- field (class: ClosureTest$OuterClass$$anonfun$sparkSumProd$1, name: $outer, type: class ClosureTest$OuterClass)
- object (class ClosureTest$OuterClass$$anonfun$sparkSumProd$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 17 more