2016-04-27 96 views
0

我有一個簡單的程序,因爲我試圖使用kafka接收數據。當我開始一個卡夫卡製作人併發送數據時,例如:「你好」,我在打印信息時得到這個信息:(null, Hello)。我不知道爲什麼會出現null。有什麼辦法可以避免這個null?我認爲這是第一個參數Tuple2<String, String>,但我只想打印第二個參數。另一件事,當我用System.out.println("inside map "+ message);打印時,它沒有出現任何信息,有人知道爲什麼嗎?謝謝。從卡夫卡火花流中的空值

public static void main(String[] args){ 

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]"); 
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode 
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); 
    // Create the context with 2 seconds batch size 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

    Map<String, Integer> topicMap = new HashMap<>(); 
    String[] topics = KafkaProperties.TOPIC.split(","); 
    for (String topic: topics) { 
     topicMap.put(topic, KafkaProperties.NUM_THREADS); 
    } 
    /* connection to cassandra */ 
    CassandraConnector connector = CassandraConnector.apply(sparkConf); 
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++"); 

    /* Receive kafka inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap); 
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++"); 

    JavaDStream<String> lines = messages.map(
      new Function<Tuple2<String, String>, String>() { 
       public String call(Tuple2<String, String> message) { 
        System.out.println("inside map "+ message); 
        return message._2(); 
       } 
      } 
    ); 

    messages.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

回答

0

Q1)空值: 消息卡夫卡是有方向性的,這意味着它們都具有一個(鍵,值)結構。 當您看到(null, Hello)是因爲生產者在主題中發佈了(null,"Hello")值。 如果你想忽略你的過程中的關鍵,映射原Dtream刪除鍵:kafkaDStream.map(new Function<String,String>() {...})

Q2)System.out.println("inside map "+ message);不打印。一對夫婦的經典原因:

  1. 變換是執行人施加,所以在羣集中運行時,該輸出將顯示在執行程序和不在主站。

  2. 操作是懶惰的,DStreams需要物化以應用操作。

在這種特定情況下,JavaDStream<String> lines永遠不會實現,即不用於輸出操作。因此map從不執行。

+0

好的,謝謝。對於Q1,我怎麼能省略Java中的密鑰(我不熟悉Scala)。而對於第二季度,我能做些什麼來打印信息?再次感謝 –

+0

Q1->使用地圖功能。有很多例子。 Q2 - >和'messages' DStream一樣。 – maasg

+0

好吧,我正在尋找Q1的地圖功能,我希望找到一些東西。對於第二季度,我不瞭解你,我必須做什麼? –