2016-11-13 56 views
0

使用spark,我解析了csv文件,其中每行代表應用程序用戶所做的調用。解析後,我得到了JavaRDD對象,它通常包含單個用戶的多個條目。解析CSV並聚合相同的記錄

現在我想要實現的是總結每個用戶的總講話時間。我遵循了其他地方給出的單詞計數示例,並且它也在我的案例中工作,但是,我不確定是否這樣是正確的方法,因爲我必須將每個解析對象映射到一個單獨的鍵。

我寫的代碼粘貼在下面,但是,我不確定這是否正確。

JavaRDD <Subscriber> cdrs = textFile.flatMap(new FlatMapFunction < String, Subscriber >() { 
public Iterable <Subscriber> call(String line) { 
    List <Subscriber> list = new ArrayList <Subscriber>(); 
    String[] fields = line.split(","); 

    if (fields != null && fields[0].trim().matches("[0-9]+")) { 
    Subscriber subscriber = new Subscriber(); 
    subscriber.setMsisdn(fields[0].trim()); 
    subscriber.setDuration(Double.parseDouble(fields[5].replaceAll("s", ""))); 

    list.add(subscriber); 
    } 

    return list; 
} 
}); 

JavaPairRDD < String, Subscriber > counts = words.mapToPair(new PairFunction < Subscriber, String, Subscriber >() { 
public Tuple2 < String, Subscriber > call(Subscriber s) { 
    return new Tuple2 < String, Subscriber > (s.getMsisdn(), s); 
} 
}).reduceByKey(new Function2 < Subscriber, Subscriber, Subscriber >() { 
@Override 
public Subscriber call(Subscriber v1, Subscriber v2) throws Exception { 
    v1.setDuration(v1.getDuration() + v2.getDuration()); 
    return v1; 
} 
}); 
+0

您是否僅限於RDD?使用Dataframe是您的選擇嗎? – Yaron

+0

我可以使用它,它只是我沒有太多火花的想法,剛開始使用它 – Waqas

回答

0

我(用火花2.0蟒蛇火花)使用的火花數據框寫入以下僞代碼:

df = spark.read.format("csv").option("header", "true").load("csv_file.csv") 
new_df = df.groupBy("username").agg(sum("talk_time").alias("total_talk_time"); 

第一行 - 負載CSV到數據幀(見這裏https://stackoverflow.com/a/37640154/5088142我的回答更多信息)

第二行 - 列上「talk_time」的集合數據由用戶名列組,並且執行sum()函數

上GROUPBY方式/彙總可以在這裏找到:http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframegroupby-retains-grouping-columns

新的數據框應該有一個「用戶名」列和「total_talk_time」列 - 這將保存您正在尋找的數據。

您必須稍微修改才能將其作爲Java-spark執行...