2016-09-07 100 views
0

我正面臨一個問題,我必須找出最大的行及其索引。這是我的方法使用Spark java查找最大行數

SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
    JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

     @Override 
     public Tuple2<Integer,String> call(String v1) throws Exception { 
      // TODO Auto-generated method stub 
      return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
     } 
    }); 
    JavaPairRDD<Integer, String> linNoToWord = JavaPairRDD.fromJavaRDD(words).sortByKey(false); 

    System.out.println(linNoToWord.first()._1+" ********************* "+linNoToWord.first()._2); 
+2

請描述你的具體問題。你的方法失敗了嗎? – YakovL

+0

問題是在文件中使用spark和line一起查找最大行的索引。 –

+0

@RaviShankar下面的答案會給你從0開始的行索引。 –

回答

0

這樣的tupleRDD將獲得關鍵的基礎和第一個元素進行排序在排序後的新rdd是最長的:

JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

    @Override 
    public Tuple2<Integer,String> call(String v1) throws Exception { 
     // TODO Auto-generated method stub 
     return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
    } 
}); 
JavaRDD<Tuple2<Integer,String>> tupleRDD1= tupleRDD.sortBy(new Function<Tuple2<Integer,String>, Integer>() { 

     @Override 
     public Integer call(Tuple2<Integer, String> v1) throws Exception { 
      // TODO Auto-generated method stub 
      return v1._1; 
     } 
    }, false, 1); 
    System.out.println(tupleRDD1.first()); 
} 
0

既然你關心的行號和文字都請試試這個。

首先創建一個序列化類線:

public static class Line implements Serializable { 
    public Line(Long lineNo, String text) { 
     lineNo_ = lineNo; 
     text_ = text; 
    } 
    public Long lineNo_; 
    public String text_; 
} 

然後執行以下操作:

SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/words.txt"); 
    JavaPairRDD<Long, Line> linNoToWord2 = rdd.zipWithIndex().mapToPair(new PairFunction<Tuple2<String,Long>, Long, Line>() { 
     public Tuple2<Long, Line> call(Tuple2<String, Long> t){ 

      return new Tuple2<Long, Line>(Long.valueOf(t._1.split(" ").length), new Line(t._2, t._1)); 
     } 
    }).sortByKey(false); 

    System.out.println(linNoToWord2.first()._1+" ********************* "+linNoToWord2.first()._2.text_);