0

廣播可變我試圖從卡夫卡到閱讀和使用星火推到另一個數據kakfa隊列。KafkaProducer在卡夫卡的Spark集成

我最初的方法是創建KafkaProducer對象在RDD的分區中的每個記錄,它的工作不錯,但在性能方面它是非常糟糕的。

所以我試圖用廣播變量概念,使KakfaProducer廣播變量將它傳遞給執行人。它結束了 異常線程「main」 com.esotericsoftware.kryo.KryoException:java.util.ConcurrentModificationException

請解釋或修改我的代碼火花使用KakfaProducer用正確的方式和更好的性能。

import java.io.Serializable; 
    import java.util.HashMap; 
    import java.util.Map; 
    import java.util.Properties; 
    import java.util.Set; 
    import java.util.TreeSet; 
    import java.util.concurrent.Future; 

    import org.apache.commons.configuration.ConfigurationConverter; 
    import org.apache.commons.configuration.ConfigurationException; 
    import org.apache.commons.configuration.PropertiesConfiguration; 
    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.Producer; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.apache.kafka.clients.producer.RecordMetadata; 
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.broadcast.Broadcast; 
    import org.apache.spark.streaming.Duration; 
    import org.apache.spark.streaming.api.java.JavaInputDStream; 
    import org.apache.spark.streaming.api.java.JavaStreamingContext; 
    import org.apache.spark.streaming.kafka.KafkaUtils; 

    import kafka.common.TopicAndPartition; 
    import kafka.message.MessageAndMetadata; 
    import kafka.serializer.StringDecoder; 

    public class MyService implements Serializable { 


     private static final long serialVersionUID = 1L; 
     private PropertiesConfiguration props; 
     private Producer<String, String> producer = null; 
     private Future<RecordMetadata> receipt = null; 
     private RecordMetadata receiptInfo = null; 

     public void setProperties() { 

      try { 
       props = new PropertiesConfiguration("/conf/empty.properties"); 
      } catch (ConfigurationException e) { 
       // TODO Auto-generated catch block 
       System.out.println("Line 51"); 
       e.printStackTrace(); 
      } 

      if (!props.containsKey("producer.topic")) { 
       props.setProperty("producer.topic", "mytopic"); 
      } 

      Properties producerprops = ConfigurationConverter.getProperties(props); 

      producerprops.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers")); 
      producerprops.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      producerprops.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ???? 

      this.producer = new KafkaProducer<String, String>(producerprops); 

     } 

     public void sendmessage(String Value) { 

      try { 
       System.out.println("Line 111"); 

       String key = "xyz"; 

       if (Value.toString() == "20") { 
        receipt = producer 
          .send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, Value)); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

     } 


     public static void main(String[] args) { 
      String topicNames = "mysourcetopic"; 
      Set<String> topicSet = new TreeSet<String>(); 
      for (String topic : topicNames.split(",")) { 
       topicSet.add(topic.trim()); 
      } 

      Map<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>(); 
      for (String topic : topicNames.split(",")) { 
       for (int i = 0; i < 2; i++) { 

        TopicAndPartition tp = new TopicAndPartition(topic, i); 
        topicMap.put(tp, 0l); 
       } 
      } 

      JavaSparkContext sparkConf = new JavaSparkContext("**************", "Kafka-Spark"); 

      MyService ec = new MyService(); 
      ec.setProperties(); 

      final Broadcast<Producer> bCastProducer = sparkConf.broadcast(ec.producer); 

      sparkConf.getConf().set("spark.local.ip", "abcddd"); 

      sparkConf.getConf().set("spark.eventLog.enabled", "false"); 
      sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio"); 

      JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000)); 

      Map<String, String> kafkaParams = new HashMap<String, String>(); 
      String pollInterval = "10000"; 
      String zookeeper = "xyzzz"; 
      int partition = 1; 
      kafkaParams.put("metadata.broker.list", "xyzzz"); 
      kafkaParams.put("group.id", "Consumer"); 
      kafkaParams.put("client.id", "Consumer"); 
      kafkaParams.put("zookeeper.connect", zookeeper); 
      JavaInputDStream<String> dfs = KafkaUtils.createDirectStream(jsc, String.class, String.class, 
        StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicMap, 
        (Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message); 

      dfs.foreachRDD(rdd -> { 
       if (rdd.isEmpty()) { 
        return; 
       } 

       rdd.foreachPartition(itr -> { 
        try { 
         // System.out.println("231"); 

         while (itr.hasNext()) { 
          ec.sendmessage(itr.next()); // Produce 

         } 

        } catch (Exception e) { 
        } 
       }); 
      }); 
      jsc.start(); 
      jsc.awaitTermination(); 
     } 

    } 

回答