2017-08-04 157 views
0

嗨,大家好我正在與kafka> spark streaming> Elasticsearch合作。 但我不做火花流JavaInputDStream JSON elasticsearch。如何將JavaInputDStream JSON轉換爲ElasticSearch JAVA

我的代碼:

SparkConf conf = new SparkConf() 
      .setAppName("Streaming") 
      .setMaster("local") 
      .set("es.nodes","localhost:9200") 
      .set("es.index.auto.create","true"); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000)); 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 
    kafkaParams.put("group.id", "exastax"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("enable.auto.commit", false); 

    Collection<String> topics = Arrays.asList("loglar"); 
    JavaInputDStream<ConsumerRecord<String, String>> stream = 
      KafkaUtils.createDirectStream(
        streamingContext, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
      ); 

    JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value())); 
    finisStream.print(); 
    JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs"); 
    streamingContext.start(); 
    streamingContext.awaitTermination(); 


} 

JavaEsSparkStreaming.saveJsonToEs(finisStream, 「火花/文檔」); >> finisStream不工作,因爲它不是JavaDStream。 如何轉換JavaDStream?

回答

1

JavaEsSparkStreaming.saveJsonToEs作品與JavaDStream

JavaEsSparkStreaming.saveToEsWithMeta作品與JavaPairDStream

要修復代碼:

JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() { 
    public String call(Tuple2<String, String> stringStringTuple2) throws Exception { 
     return stringStringTuple2._2(); 
    } 
}); 

JavaEsSparkStreaming.saveJsonToEs(finisStream,""); 
1

日Thnx很多關於答案!但我解決了這個代碼:

JavaDStream<String> stream1 = stream.map(
       new Function<ConsumerRecord<String, String>, String>() { 
        @Override 
        public String call(ConsumerRecord<String, String> r) { 
         return r.value(); 
        } 
       } 
     ); 
      JavaEsSparkStreaming.saveJsonToEs(stream1,"spark/docs");