1

我沒有從使用Kafka直接流的隊列中獲取任何數據。在我的代碼中,我把System.out.println()這個語句不運行,這意味着我沒有從該主題獲取任何數據..使用Java Spark從卡夫卡主題獲取的值 - kafka direct stream

我很確定隊列中的數據可用,因爲沒有獲取控制檯。

我沒有在控制檯中看到任何錯誤。

任何人都可以請建議一些東西嗎?

這裏是我的Java代碼,

SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
     sparkConf.set("spark.streaming.concurrentJobs", "3"); 

     // Create the context with 2 seconds batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000)); 

     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); 
     kafkaParams.put("auto.offset.reset", "latest"); 
     kafkaParams.put("enable.auto.commit", true); 

     Collection<String> topics = Arrays.asList("topicName"); 

     final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, 
       LocationStrategies.PreferConsistent(), 
       ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 


     JavaPairDStream<String, String> lines = stream 
       .mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() { 
        @Override 
        public Tuple2<String, String> call(ConsumerRecord<String, String> record) { 

         return new Tuple2<>(record.key(), record.value()); 
        } 
       }); 

     lines.print(); 

     // System.out.println(lines.count()); 
     lines.foreachRDD(rdd -> { 
      rdd.values().foreachPartition(p -> { 
       while (p.hasNext()) { 
        System.out.println("Value of Kafka queue" + p.next()); 
       } 
      }); 
     }); 
+0

兩個思路進行檢查:1)做新的數據流進你的話題?默認情況下,您只會收到比您的工作更新的數據。否則,將auto.offset.reset設置爲「最早的」2)bootstrap.servers需要與kafka發佈的值完全匹配(請參閱kafka broker配置)。如果經紀人公佈其DNS名稱,並嘗試通過IP地址進行連接,則您將收到不是數據,但無錯誤 –

+0

您是否在您的POM中添加了spark-streaming-kafka jar? – user4342532

+0

我以前在這個集成工作。如果你想任何幫助只是分享你的電子郵件 – user4342532

回答

0

@Vimal這裏是一個link在斯卡拉創造直接流的工作版本。

我相信在Scala中查看它之後,你必須很容易地轉換它。

請確保您關閉以閱讀卡夫卡的最新主題。它可能不會選擇上次處理的任何主題。

+0

我評論到「kafkaParams.put(」auto.offset.reset「,」latest「 );」仍然無法得到結果 –

+0

Scala代碼和Java代碼是完全不同的..我沒有得到任何解決方案從給定的Scala –

1

我能夠打印的使用直接卡夫卡流從卡夫卡隊列取串..

這裏是我的代碼,

import java.util.HashMap; 
import java.util.HashSet; 
import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
import java.nio.file.StandardOpenOption; 
import java.util.Arrays; 
import java.util.Calendar; 
import java.util.Collection; 
import java.util.Currency; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.concurrent.atomic.AtomicReference; 
import java.util.regex.Pattern; 

import scala.Tuple2; 

import kafka.serializer.StringDecoder; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka.HasOffsetRanges; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.kafka.OffsetRange; 
import org.json.JSONObject; 
import org.omg.CORBA.Current; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.Durations; 

public final class KafkaConsumerDirectStream { 

    public static void main(String[] args) throws Exception { 

     try { 
      SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
      sparkConf.set("spark.streaming.concurrentJobs", "30"); 

      // Create the context with 2 seconds batch size 
      JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(200)); 

      Map<String, String> kafkaParams = new HashMap<>(); 
      kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091"); 

      Set<String> topics = new HashSet(); 
      topics.add("PartWithTopic02Queue"); 

      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, 
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

      JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
       @Override 
       public String call(Tuple2<String, String> tuple2) { 
        return tuple2._2(); 
       } 
      }); 

      lines.foreachRDD(rdd -> { 

       if (rdd.count() > 0) { 
        List<String> strArray = rdd.collect(); 

        // Print string here 
       } 
      }); 

      jssc.start(); 
      jssc.awaitTermination(); 
     } 
    } 
    catch (Exception e) { 
      e.printStackTrace(); 
     } 
}