2015-02-09 81 views
0

我正在嘗試使用Java保存MongoDB中的tweet,這就是我所擁有的;使用java將rdd保存到mongo數據庫中

JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(
      3000)); 
    JavaDStream<Status> tweets = TwitterUtils.createStream(ssc); 

    JavaDStream<String> statuses = tweets 
      .map(new Function<Status, String>() { 
       public String call(Status status) { 
        return status.getUser().getName() + ":" 
          + status.getText(); 
       } 
      }); 

    JavaDStream<String> users = tweets.map(new Function<Status, String>() { 
     public String call(Status status) { 
      return status.getUser().getName(); 
     } 
    }); 


    users.foreachRDD(new Function<JavaRDD<String>, Void>() { 
     public Void call(JavaRDD<String> rdd) throws Exception { 
      if (rdd.count() > 0) 
       rdd.saveAsTextFile("storage/users/test" + rdd.id() 
         + "_.txt"); 
      return null; 
     } 
    }); 

就像你看到的,我可以用rdd.saveAsTextFile存儲用戶的文本文件,但我需要的是一種方法,這種RDD保存到數據庫(MongoDB的)。

回答

1

可以使用MongoDB Hadoop Connector存儲保存RDD使用com.mongodb.hadoop.MongoOutputFormat到MongoDB的:

Configuration config = new Configuration(); 
config.set("mongo.output.format", "com.mongodb.hadoop.MongoOutputFormat"); 
config.set("mongo.output.uri", "mongodb://host:port/database.collection"); 
rdd.saveAsNewAPIHadoopFile("file://this-is-not-used", 
          <keyClass>, 
          <valueClass>, 
          MongoOutputFormat.class, 
          config); 

它也可能是有用的看an example project如何做到這一點。