使用spark,我解析了csv文件,其中每行代表應用程序用戶所做的調用。解析後,我得到了JavaRDD對象,它通常包含單個用戶的多個條目。解析CSV並聚合相同的記錄
現在我想要實現的是總結每個用戶的總講話時間。我遵循了其他地方給出的單詞計數示例,並且它也在我的案例中工作,但是,我不確定是否這樣是正確的方法,因爲我必須將每個解析對象映射到一個單獨的鍵。
我寫的代碼粘貼在下面,但是,我不確定這是否正確。
JavaRDD <Subscriber> cdrs = textFile.flatMap(new FlatMapFunction < String, Subscriber >() {
public Iterable <Subscriber> call(String line) {
List <Subscriber> list = new ArrayList <Subscriber>();
String[] fields = line.split(",");
if (fields != null && fields[0].trim().matches("[0-9]+")) {
Subscriber subscriber = new Subscriber();
subscriber.setMsisdn(fields[0].trim());
subscriber.setDuration(Double.parseDouble(fields[5].replaceAll("s", "")));
list.add(subscriber);
}
return list;
}
});
JavaPairRDD < String, Subscriber > counts = words.mapToPair(new PairFunction < Subscriber, String, Subscriber >() {
public Tuple2 < String, Subscriber > call(Subscriber s) {
return new Tuple2 < String, Subscriber > (s.getMsisdn(), s);
}
}).reduceByKey(new Function2 < Subscriber, Subscriber, Subscriber >() {
@Override
public Subscriber call(Subscriber v1, Subscriber v2) throws Exception {
v1.setDuration(v1.getDuration() + v2.getDuration());
return v1;
}
});
您是否僅限於RDD?使用Dataframe是您的選擇嗎? – Yaron
我可以使用它,它只是我沒有太多火花的想法,剛開始使用它 – Waqas