2016-08-24 87 views
2

任何人都可以請解釋爲什麼案例,Seq[Row]用於具有元素集合的dataframe字段的爆炸後。 也可以請你解釋爲什麼asInstanceOf需要從爆炸場獲得數值的原因?火花數據幀爆炸功能

下面是語法:

val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) {  
          case Row(employee: Seq[Row]) => 
          employee.map(employee => 
          Employee(employee(0).asInstanceOf[String], 
          employee(1).asInstanceOf[String], employee(2).asInstanceOf[String])) } 

回答

0

我覺得你可以閱讀更多有關文件,首先做一個測試。

數據幀的爆炸仍然返回一個數據幀。並接受一個lambda函數f:(Row)⇒TraversableOnce[A]作爲參數。

在lambda函數中,您將按大小寫匹配輸入。你已經知道你的輸入將是員工的Row,它仍然是Row的Seq。所以輸入的情況將Row(員工:Seq [Row]),如果你不理解這個部分,你可以學習更多有關scala中的無用函數的信息。

而且,員工(我相信您應該在這裏使用員工)作爲行的Seq,將應用地圖功能將每行映射到員工。你將使用scala apply函數獲取這一行中的第i個值。但是返回值是一個Object,所以你必須使用asInstanceOf將它轉換爲你所期望的類型。

2

首先我會注意到,我無法解釋爲什麼你的explode()變成Row(employee: Seq[Row]),因爲我不知道DataFrame的模式。我必須假設它與你的數據結構有關。

不知道你的原始數據,我創建了一個小數據集從

scala> val df = sc.parallelize(Array((1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text") 
df: org.apache.spark.sql.DataFrame = [id: int, text: string] 

工作,如果我現在在地圖,你可以SE,它返回一個包含任何類型的數據行。

scala> df.map {case row: Row => (row(0), row(1)) } 
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33 

你已經基本上失去了類型信息,這就是爲什麼你需要明確指定的類型,當你想在該行中使用的數據

scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) } 
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33 

因此,爲了要爆炸了,我必須做到以下幾點

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.Row 
df.explode(col("id"), col("text")) {case row: Row => 
    val id = row(0).asInstanceOf[Int] 
    val words = row(1).asInstanceOf[String].split(" ") 
    words.map(word => (id, word)) 
} 

// Exiting paste mode, now interpreting. 

import org.apache.spark.sql.Row 
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string] 

scala> res30 show 
+---+--------------------+---+-----+ 
| id|    text| _1| _2| 
+---+--------------------+---+-----+ 
| 1|dsfds dsf dasf ds...| 1|dsfds| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| dasf| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| d| 
| 2|2344 2353 24 2343...| 2| 2344| 
| 2|2344 2353 24 2343...| 2| 2353| 
| 2|2344 2353 24 2343...| 2| 24| 
| 2|2344 2353 24 2343...| 2|23432| 
| 2|2344 2353 24 2343...| 2| 234| 
+---+--------------------+---+-----+ 

如果你想命名的列,您可以定義的情況下類來保存你的數據爆炸

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.Row 
case class ExplodedData(word: String) 
df.explode(col("id"), col("text")) {case row: Row => 
    val words = row(1).asInstanceOf[String].split(" ") 
    words.map(word => ExplodedData(word)) 
} 

// Exiting paste mode, now interpreting. 

import org.apache.spark.sql.Row 
defined class ExplodedData 
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string] 

scala> res35.select("id","word").show 
+---+-----+ 
| id| word| 
+---+-----+ 
| 1|dsfds| 
| 1| dsf| 
| 1| dasf| 
| 1| dsf| 
| 1| dsf| 
| 1| d| 
| 2| 2344| 
| 2| 2353| 
| 2| 24| 
| 2|23432| 
| 2| 234| 
+---+-----+ 

希望這會帶來一些清晰。