2017-02-26 89 views
3

我有一個獨立的Spark集羣正在讀取來自kafka隊列的數據。 kafka隊列有5個分區,spark只處理其中一個分區的數據。我使用的是以下情況:Kafka - Spark Streaming - 僅從一個分區讀取數據

這裏是我的Maven依賴:

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>kafka-custom</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.1</version> 
    </dependency> 

我卡夫卡生產者是一個簡單的生產,這只是把100級的消息隊列:

public void generateMessages() { 

    // Define the properties for the Kafka Connection 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", kafkaBrokerServer); // kafka server 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 

    // Create a KafkaProducer using the Kafka Connection properties 
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
      props); 
    for (int i = 0; i < 100; i++) { 
     ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopic, "value-" + i); 
     producer.send(record); 
    } 
    producer.close(); 

} 

這裏我的火花流作業中的主代碼:

public void processKafka() throws InterruptedException { 
    LOG.info("************ SparkStreamingKafka.processKafka start"); 

    // Create the spark application 
    SparkConf sparkConf = new SparkConf(); 
    sparkConf.set("spark.executor.cores", "5"); 

    //To express any Spark Streaming computation, a StreamingContext object needs to be created. 
    //This object serves as the main entry point for all Spark Streaming functionality. 
    //This creates the spark streaming context with a 'numSeconds' second batch size 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval)); 


    //List of parameters 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", this.getBrokerList()); 
    kafkaParams.put("client.id", "SpliceSpark"); 
    kafkaParams.put("group.id", "mynewgroup"); 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("enable.auto.commit", false); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 

    List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>(); 
    for(int i=0; i<5; i++) { 
     topicPartitions.add(new TopicPartition("mytopic", i)); 
    } 


    //List of kafka topics to process 
    Collection<String> topics = Arrays.asList(this.getTopicList().split(",")); 


    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
      jssc, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
     ); 

    //Another version of an attempt 
    /* 
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams) 
    ); 
    */ 

    messages.foreachRDD(new PrintRDDDetails()); 


    // Start running the job to receive and transform the data 
    jssc.start(); 

    //Allows the current thread to wait for the termination of the context by stop() or by an exception 
    jssc.awaitTermination(); 
} 

PrintRDDDetails的調用方法有如下:

public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
     throws Exception { 

    LOG.error("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 

} 

什麼似乎發生的是,它只從一個分區獲取數據。我已經在kafka中確認有5個分區。當執行調用方法時,它會打印正確數量的分區,但只打印1分區中的記錄 - 並且我從該簡化代碼中取出的進一步處理 - 表明它只處理1個分區。

回答

4

這似乎與星火2.1.0的問題,因爲它使用了卡夫卡的客戶端庫v0.10.1(按照以下拉入請求):

https://github.com/apache/spark/pull/16278

我工作圍繞這個用較新版本的kafka客戶端庫:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming"    % sparkVersion, 
    "org.apache.spark" %% "spark-sql"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, 
    "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion, 
).map(_.exclude("org.apache.kafka", "kafka-clients")) 

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.0" 
+0

謝謝 - 我會嘗試並讓你知道。 – Erin

+0

帕拉姆 - 工作!非常感謝你。 – Erin

+0

太棒了!我很高興:) – Param

相關問題