2015-02-05 91 views
3

我是一個初學者與卡夫卡和火花。我想通過火花流實時處理關於特定主題從卡夫卡收到的數據。我無法使用由createStream函數返回的JavaPairReceiverInputDStream。kafka火花流java api問題

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
       "testwordCount"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, 
       Durations.seconds(1)); 

     Map<String, Integer> topics_map = new HashMap<String, Integer>(); 

     topics_map.put("Customtopic", 10); 

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils 
       .createStream(jssc, "localhost:2181", "kafkasparkconsumer", 
         topics_map); 

下面的代碼給出了一個錯誤:在類型JavaPairDStream

JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
       new PairFunction<String, String, Integer>() { 
       @Override public Tuple2<String, Integer> call(String s) { 
       return new Tuple2<String, Integer>(s, 1); 
       } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
       @Override 
       public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
       } 
       }); 
       wordCounts.print(); 

的方法,圖(功能,R>)是不適用的參數(新PairFunction(){})SparkStreamingKafka。 java/Kafka-Spark/src/com/sd/kafka line 43 Java問題

我使用的spark版本是1.2.0。我找不到處理kafka消息的java api示例。誰能告訴我我需要改變什麼?

回答

6

您調用了錯誤的方法。在java中,如果你想獲得一對,你應該打電話MapToPair。試試這個代碼:

JavaPairDStream<String, Integer> pairs = kafkaStream 
      .mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() { 
       @Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception { 
        return new Tuple2<String, Integer>(word._2(), 1); 
       } 
      }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
       @Override public Integer call(Integer integer, Integer integer2) throws Exception { 
        return integer + integer2; 
       } 
      }); 

    pairs.print(); 

    jssc.start(); 
    jssc.awaitTermination(); 
0

kafkaStream返回一個元組。檢查此

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils 
       .createStream(jssc, "localhost:2181", "kafkasparkconsumer", 
         topics_map); 
JavaDStream<String> lines = kafkaStream 
       .map(new Function<Tuple2<String, String>, String>() { 
        /** 
        * 
        */ 
        private static final long serialVersionUID = 1L; 

        @Override 
        public String call(Tuple2<String, String> tuple2) { 
         return tuple2._2(); 
        } 
       });