我沒有從使用Kafka直接流的隊列中獲取任何數據。在我的代碼中,我把System.out.println()這個語句不運行,這意味着我沒有從該主題獲取任何數據..使用Java Spark從卡夫卡主題獲取的值 - kafka direct stream
我很確定隊列中的數據可用,因爲沒有獲取控制檯。
我沒有在控制檯中看到任何錯誤。
任何人都可以請建議一些東西嗎?
這裏是我的Java代碼,
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("topicName");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> lines = stream
.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
});
lines.print();
// System.out.println(lines.count());
lines.foreachRDD(rdd -> {
rdd.values().foreachPartition(p -> {
while (p.hasNext()) {
System.out.println("Value of Kafka queue" + p.next());
}
});
});
兩個思路進行檢查:1)做新的數據流進你的話題?默認情況下,您只會收到比您的工作更新的數據。否則,將auto.offset.reset設置爲「最早的」2)bootstrap.servers需要與kafka發佈的值完全匹配(請參閱kafka broker配置)。如果經紀人公佈其DNS名稱,並嘗試通過IP地址進行連接,則您將收到不是數據,但無錯誤 –
您是否在您的POM中添加了spark-streaming-kafka jar? – user4342532
我以前在這個集成工作。如果你想任何幫助只是分享你的電子郵件 – user4342532