2017-07-25 71 views
0

我想使用Spark DataSet加載相當大的數據(比方說),其中的子集數據看起來如下所示。Spark中的關係轉換

|age|maritalStatus| name|sex| 
+---+-------------+--------+---+ 
| 35|   M| Joanna| F| 
| 25|   S|Isabelle| F| 
| 19|   S| Andy| M| 
| 70|   M| Robert| M| 
+---+-------------+--------+---+ 

我需要的是有關係的轉換,其中一列從另一列(S)獲得它的價值。 例如,根據每個人記錄的「年齡」&「性別」,我需要將Mr或Ms/Mrs放在每個「name」屬性前面。另一個例子是,對於60歲以上的「年齡」的人,我需要將他或她標爲老年公民(派生專欄「seniorCitizen」爲Y)。

我對轉換後的數據最終需要的是如下:

+---+-------------+---------------------------+---+ 
|age|maritalStatus|   name|seniorCitizen|sex| 
+---+-------------+---------------------------+---+ 
| 35|   M| Mrs. Joanna|   N| F| 
| 25|   S| Ms. Isabelle|   N| F| 
| 19|   S|  Mr. Andy|   N| M| 
| 70|   M| Mr. Robert|   Y| M| 
+---+-------------+--------+------------------+---+ 

大多數火花提供轉換是相當靜態的,而不是dyanmic。例如,如在示例herehere中定義的。

我正在使用Spark Datasets,因爲我是從關係數據源加載的,但如果您可能會建議使用普通RDD進行此操作的更好方法,請執行此操作。

+1

你可以使用Dataframes和UDF做到這一點,你可以結合whenconcat,​​3210功能。 –

+0

我想你的名字轉換應該取決於婚姻狀況而不是年齡,不是嗎? –

回答

2

您可以使用withColumn添加一個新列,用於seniorCitizen使用where子句和更新name您可以使用定義的函數(udf)用戶如下

import spark.implicits._ 

import org.apache.spark.sql.functions._ 
//create a dummy data 
val df = Seq((35, "M", "Joanna", "F"), 
    (25, "S", "Isabelle", "F"), 
    (19, "S", "Andy", "M"), 
    (70, "M", "Robert", "M") 
).toDF("age", "maritalStatus", "name", "sex") 

// create a udf to update name according to age and sex 
val append = udf((name: String, maritalStatus:String, sex: String) => { 
    if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}" 
    else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}" 
    else s"Mr. ${name}" 
}) 

//add two new columns with using withColumn 
df.withColumn("name", append($"name", $"maritalStatus", $"sex")) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show 

輸出:

+---+-------------+------------+---+-------------+ 
|age|maritalStatus|  name|sex|seniorCitizen| 
+---+-------------+------------+---+-------------+ 
| 35|   M| Mrs. Joanna| F|   N| 
| 25|   S|Ms. Isabelle| F|   N| 
| 19|   S| Mr. Andy| M|   N| 
| 70|   M| Mr. Robert| M|   Y| 
+---+-------------+------------+---+-------------+ 

編輯:

這裏是輸出沒有使用UDF

df.withColumn("name", 
    when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name")))) 
    .otherwise(concat(lit("Ms. "), df("name")))) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")) 

希望這有助於!

+0

我希望這有助於:) –

1

Spark functions可以幫助您完成工作。如下面的陳述

val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name")) 
         .otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name")) 
         .otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name")))) 

val updatedDataSet = dataset.withColumn("name", updateName) 
    .withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N")) 

updatedDataSet是你需要dataset