我有一個簡單的程序,因爲我試圖使用kafka
接收數據。當我開始一個卡夫卡製作人併發送數據時,例如:「你好」,我在打印信息時得到這個信息:(null, Hello)
。我不知道爲什麼會出現null。有什麼辦法可以避免這個null?我認爲這是第一個參數Tuple2<String, String>
,但我只想打印第二個參數。另一件事,當我用System.out.println("inside map "+ message);
打印時,它沒有出現任何信息,有人知道爲什麼嗎?謝謝。從卡夫卡火花流中的空值
public static void main(String[] args){
SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
// Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = KafkaProperties.TOPIC.split(",");
for (String topic: topics) {
topicMap.put(topic, KafkaProperties.NUM_THREADS);
}
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sparkConf);
System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");
/* Receive kafka inputs */
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
System.out.println("inside map "+ message);
return message._2();
}
}
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
好的,謝謝。對於Q1,我怎麼能省略Java中的密鑰(我不熟悉Scala)。而對於第二季度,我能做些什麼來打印信息?再次感謝 –
Q1->使用地圖功能。有很多例子。 Q2 - >和'messages' DStream一樣。 – maasg
好吧,我正在尋找Q1的地圖功能,我希望找到一些東西。對於第二季度,我不瞭解你,我必須做什麼? –