2017-06-19 143 views
1

我正在執行Spark Streaming作業,並且想要發佈我的result_dstream,它是DStream [GenericData.Record]類型,所以我使用下面的代碼來實現此目的,但獲取任務不可序列化異常嘗試發佈到Kafka時獲取任務序列化異常

val prod_props : Properties = new Properties() 
prod_props.put("bootstrap.servers" , "localhost:9092") 
prod_props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer") 
prod_props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") 

val _producer : KafkaProducer[String , Array[Byte]] = new KafkaProducer(prod_props) 

result_DStream.foreachRDD(r => { 
    r.foreachPartition(it => { 
    while(it.hasNext) 
     { 
     val schema = new Schema.Parser().parse(schema_string) 
     val recordInjection : Injection[GenericRecord , Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
     val record : GenericData.Record = it.next() 
     val byte : Array[Byte] = recordInjection.apply(record) 
     val prod_record : ProducerRecord[String , Array[Byte]] = new ProducerRecord("sample_topic_name_9" , byte) 
     _producer.send(prod_record) 
     } 
    }) 
}) 

我能做些什麼來解決這個問題?我曾嘗試過使用lambda函數中的不可序列化類並使用foreachPartition而不是foreach,並且問題依據我在schemarecordInjection中。

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
at HadoopMetrics_Online$$anonfun$main$3.apply(HadoopMetrics_Online.scala:187) 
at HadoopMetrics_Online$$anonfun$main$3.apply(HadoopMetrics_Online.scala:186) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer 
Serialization stack: 
- object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: [email protected]) 
- field (class: HadoopMetrics_Online$$anonfun$main$3, name: _producer$1, type: class org.apache.kafka.clients.producer.KafkaProducer) 
- object (class HadoopMetrics_Online$$anonfun$main$3, <function1>) 
- field (class: HadoopMetrics_Online$$anonfun$main$3$$anonfun$apply$1, name: $outer, type: class HadoopMetrics_Online$$anonfun$main$3) 
- object (class HadoopMetrics_Online$$anonfun$main$3$$anonfun$apply$1, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
... 30 more 
+0

添加堆棧跟蹤將幫助 – hadooper

+0

添加堆棧跟蹤 – JSR29

回答

3

KafkaProducer不可序列化,而你在關閉它在你的foreachPartition方法。你需要在內部聲明它:

resultDStream.foreachRDD(r => { 
    r.foreachPartition(it => { 
    val producer : KafkaProducer[String , Array[Byte]] = new KafkaProducer(prod_props) 
    while(it.hasNext) { 
     val schema = new Schema.Parser().parse(schema_string) 
     val recordInjection : Injection[GenericRecord , Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
     val record : GenericData.Record = it.next() 
     val byte : Array[Byte] = recordInjection.apply(record) 
     val prod_record : ProducerRecord[String , Array[Byte]] = new ProducerRecord("sample_topic_name_9" , byte) 
     producer.send(prod_record) 
     } 
    }) 
}) 

附註 - 斯卡拉命名約定是駝峯變量名,而不是snake_case。

+0

如何知道將來哪些類的對象很麻煩。 – JSR29

+0

@ JSR29 Spark有一個'ClosureCleaner',它給你造成問題的確切字段,讀取stacktrace:'object not serializable(class:org.apache.kafka.clients.producer.KafkaProducer,value:org.apache .kafka.clients.producer.KafkaProducer @ 252f5489)' –

相關問題