2017-07-08 814 views
-4

如何計算Spark Scala中每個位置的平均工資低於兩個數據集?使用Spark Scala計算平均值

File1.csv(第4欄是薪金)

Ram, 30, Engineer, 40000 
Bala, 27, Doctor, 30000 
Hari, 33, Engineer, 50000 
Siva, 35, Doctor, 60000 

File2.csv(第2欄是位置)

Hari, Bangalore 
Ram, Chennai 
Bala, Bangalore 
Siva, Chennai 

上述文件不被排序。需要加入這兩個文件,並找到每個位置的平均工資。我試着用下面的代碼,但無法做到。

val salary = sc.textFile("File1.csv").map(e => e.split(",")) 
val location = sc.textFile("File2.csv").map(e.split(",")) 
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1))) 
val joinedData = joined.sortByKey() 
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2)) 
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))  
aggregatedDF.repartition(1).saveAsTextFile("output.txt") 

請幫忙看看它的代碼和示例輸出。

非常感謝

回答

0

我會用數據幀API,這應該工作:

val salary = sc.textFile("File1.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,_,_,salary) => (name,salary)} 
       .toDF("name","salary") 

val location = sc.textFile("File2.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,location) => (name,location)} 
       .toDF("name","location") 

import org.apache.spark.sql.functions._ 

salary 
    .join(location,Seq("name")) 
    .groupBy($"location") 
    .agg(
    avg($"salary").as("avg_salary") 
) 
    .repartition(1) 
    .write.csv("output.csv") 
+0

那麼這裏的最終輸出如下所示? + ------------------------ + |位置| avg_salary | + ------------------------ + |班加羅爾| 40000 | |欽奈| 500000 | + ------------------------ + – akrockz

+0

還有一個疑問。假設代替工資,該列的尺寸爲600 * 200(長*寬),在這種情況下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz

0

你可以做這樣的事情:

val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim)) 
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim)) 
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1)))) 
val locSalary = joined.map(v => (v._2._2, v._2._1)) 
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e), 
     (t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1) 

然後averages.take(10)會給:

res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000)) 
+0

感謝您的答覆。假設而不是工資,該列的尺寸爲600 * 200(長度*寬度),在這種情況下我如何找到平均值? Ram 600 * 200 Hari 700 * 300 等等...... – akrockz

+0

尺寸是否以字符串形式給出?你想平均面積(長度乘以寬度)還是平均每個維度? – Harald

+0

我想有這些尺寸的每個平均值,按位置組。 – akrockz

1

我會用dataframes: 首先閱讀dataframes如:

val salary = spark.read.option("header", "true").csv("File1.csv") 
val location = spark.read.option("header", "true").csv("File2.csv") 

如果您沒有標題,則需要將選項設置爲「false」並使用withColumnRenamed更改默認名稱。

val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary") 
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location") 

現在要做的加入:

val joined = salary.join(location, "name") 

最後做平均計算:

val avg = joined.groupby("location").agg(avg($"salary")) 

節省做:

avg.repartition(1).write.csv("output.csv") 
+0

感謝您的答覆。假設而不是工資,該列的尺寸爲600 * 200(長度*寬度),在這種情況下我如何找到平均值? Ram 600 * 200 Hari 700 * 300等等...... – akrockz

+0

你是什麼意思?你的意思是每個名稱有多個列,每個名稱有多個列? –

1

可以讀取CSV文件作爲DataFrames,然後加入和他們組得到的平均值:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")).agg(avg(df1("salary")).as("average")). 
    select("location", "average") 

dfAverage.show 
+-----------+-------+ 
| location|average| 
+-----------+-------+ 
|Bangalore |40000.0| 
| Chennai |50000.0| 
+-----------+-------+ 

[更新]計算平均尺寸:

// file1.csv: 
Ram,30,Engineer,40000,600*200 
Bala,27,Doctor,30000,800*400 
Hari,33,Engineer,50000,700*300 
Siva,35,Doctor,60000,600*200 

// file2.csv 
Hari,Bangalore 
Ram,Chennai 
Bala,Bangalore 
Siva,Chennai 

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary", "dimensions" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.IntegerType 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")). 
    agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"), 
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width") 
). 
    select(
    $"location", $"avg_length", $"avg_width", 
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions") 
) 

dfAverage.show 
+---------+----------+---------+--------------+ 
| location|avg_length|avg_width|avg_dimensions| 
+---------+----------+---------+--------------+ 
|Bangalore|  750.0| 350.0| 750.0*350.0| 
| Chennai|  600.0| 200.0| 600.0*200.0| 
+---------+----------+---------+--------------+ 
+0

感謝您的回覆。假設而不是工資,該列的尺寸爲600 * 200(長度*寬度),在這種情況下我如何找到平均值?拉姆600 * 200哈里700 * 300等...... – akrockz

+0

@akrockz,請參閱擴展答案。 –

+0

非常感謝@Leo C ..這就是我一直在尋找..最後的一個請求..目前我沒有在我的筆記本電腦中安裝Spark ..如果我向你發送郵件,是否可以向我發送輸出輸入數據 ? 對不起,要求太多..謝謝 – akrockz