2016-11-24 179 views
4

我非常確定我只推送數據字符串,反序列化也是String。我推它的記錄也顯示錯誤。Java中的對象不可序列化(org.apache.kafka.clients.consumer.ConsumerRecord)spark kafka streaming

但是爲什麼突然它顯示出這種類型的錯誤,有什麼我失蹤?

這裏是下面的代碼,

import java.util.HashMap; 
    import java.util.HashSet; 
    import java.util.Arrays; 
    import java.util.Collection; 
    import java.util.Iterator; 
    import java.util.Map; 
    import java.util.Set; 
    import java.util.concurrent.atomic.AtomicReference; 
    import java.util.regex.Pattern; 

    import scala.Tuple2; 

    import kafka.serializer.StringDecoder; 

    import org.apache.spark.SparkConf; 
    import org.apache.spark.api.java.JavaPairRDD; 
    import org.apache.spark.api.java.JavaRDD; 
    import org.apache.spark.api.java.function.*; 
    import org.apache.spark.streaming.api.java.*; 
    import org.apache.spark.streaming.kafka.HasOffsetRanges; 
    import org.apache.spark.streaming.kafka10.*; 
    import org.apache.spark.streaming.kafka.OffsetRange; 
    import org.apache.spark.streaming.Duration; 
    import org.apache.spark.streaming.Durations; 

public final class KafkaConsumerDirectStream { 
    public static void main(String[] args) throws Exception { 
     try { 
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
        sparkConf.set("spark.streaming.concurrentJobs", "3"); 

        // Create the context with 2 seconds batch size 
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

        Map<String, Object> kafkaParams = new HashMap<>(); 
        // kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091, 
        // x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093"); 

        kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9091"); 
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        kafkaParams.put("group.id", "11_ubiq_12dj"); 
        kafkaParams.put("enable.auto.commit", "true"); 
        kafkaParams.put("auto.commit.interval.ms", "1000"); 
        kafkaParams.put("session.timeout.ms", "30000"); 
        kafkaParams.put("auto.offset.reset", "earliest"); 
        kafkaParams.put("enable.auto.commit", true); 

        Collection<String> topics = Arrays.asList("TopicQueue"); 

        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, 
          LocationStrategies.PreferBrokers(), 
          ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 

        //stream.print(); 


        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { 
         @Override 
         public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { 
          final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
          rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { 
           @Override 
           public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) { 
            OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; 

            // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges); 
            System.out.println(
              o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); 

           } 
          }); 
         } 
        }); 

        jssc.start(); 
        jssc.awaitTermination(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       }  
    } 
} 

以下錯誤提高,

16/11/24 00:19:14 ERROR JobScheduler: Error running job streaming job 1479964754000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 30.0 (TID 1500) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = PartWithTopic02Queue, partition = 36, offset = 555, CreateTime = 1479964753779, checksum = 2582644462, serialized key size = -1, serialized value size = 6, key = null, value = Hello0)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at java.lang.Thread.getStackTrace(Thread.java:1117) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
    at java.lang.Thread.run(Thread.java:785) 
+0

請忽略註釋代碼 –

+0

異常提高運行 –

回答

1

看來org.apache.spark.streaming.kafka10.*;工作不正常。 我只用了org.apache.spark.streaming.kafka這對我來說工作得很好。

1

apache.kafka.clients.consumer.ConsumerRecord類是不可序列不能用於RMI或喜歡。

+0

在此之前,這是工作非常好。可能是我做了一些改變,這就是爲什麼它不工作... –

+0

@CodeIdenti,之前你是什麼意思?也許你正在序列化一個空對象? – AntJavaDev

+0

如果當時「TopicQueue」中沒有數據,它工作正常。但是當「TopicQueue」中的數據出現此錯誤時。 –

0

你只需要添加public final class KafkaConsumerDirectStream implements java.io.Serializable這對我的工作,雖然使用org.apache.spark.streaming.kafka10.*

希望你能幫助,謝謝:-)

相關問題