2016-12-28 63 views
2

假設我有以下數據框:如何凝聚在星火數據幀陣列列

id | myStruct 
___________________ 
1 | [val1, val2] 
___________________ 
2 | [val3, val4] 
___________________ 
1 | [val5, val6] 

我想組共享相同的ID爲myStructs陣列列中的所有myStructs。因此,上述數據幀應該成爲

id | myStruct 
__________________________________ 
1 | [[val1, val2], [val5, val6]] 
__________________________________ 
2 | [[val3, val4]] 

我知道有一個陣列功能,但只有每列轉換爲大小1.如何合併所產生的數組的數組?

我在Scala shell中使用Spark 1.5.2。

鑑於我正在使用Spark 1.5.2,我無法使用collect_list或collect_set。

回答

3

如果您使用的Spark 1.5和你不能升級最簡單的選項是RDD.groupByKey

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 

val rows = df.rdd 
    .map { case Row(id, myStruct) => (id, myStruct) } 
    .groupByKey 
    .map { case (id, myStructs) => Row(id, myStructs) } 

val schema = StructType(Seq(
    df.schema("id"), 
    StructField("myStructs", ArrayType(df.schema("myStruct").dataType)) 
)) 

sqlContext.createDataFrame(rows, schema) 

它可以通過轉換爲「對」被概括如下:

import org.apache.spark.sql.functions.struct 

df.select(
    struct($"key1", $"key2", ..., $"keyn").alias("id"), 
    struct($"val1", $"val2", ..., $"valn").alias("myStruct") 
) 
1

火花2.0+,您可以使用collect_list來實現:

scala> val df = sc.parallelize(Seq((1, ("v1", "v2")), (2, ("v3", "v4")), (1, ("v5", "v6")))).toDF("id", "myStruct") 
df: org.apache.spark.sql.DataFrame = [id: int, myStruct: struct<_1: string, _2: string>] 

scala> df.show 
+---+--------+ 
| id|myStruct| 
+---+--------+ 
| 1| [v1,v2]| 
| 2| [v3,v4]| 
| 1| [v5,v6]| 
+---+--------+ 

scala> df.groupBy("id").agg(collect_list($"myStruct")).show 
+---+----------------------+              
| id|collect_list(myStruct)| 
+---+----------------------+ 
| 1| [[v1,v2], [v5,v6]]| 
| 2|    [[v3,v4]]| 
+---+----------------------+ 

然而,在星火1.5.2你需要像這樣:

scala> val df2 = df.select($"id", $"myStruct._1".as("p1"), $"myStruct._2".as("p2")) 
df2: org.apache.spark.sql.DataFrame = [id: int, p1: string, p2: string] 

scala> df2.show 
+---+---+---+ 
| id| p1| p2| 
+---+---+---+ 
| 1| v1| v2| 
| 2| v3| v4| 
| 1| v5| v6| 
+---+---+---+ 

scala> val rdd = df2.rdd.map{case Row(id: Int, p1: String, p2: String) => (id, (p1, p2))} 
rdd: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[47] at map at <console>:32 

scala> val finalDF = rdd.groupByKey.map(x => (x._1, x._2.toList)).toDF("id", "structs") 
finalDF: org.apache.spark.sql.DataFrame = [id: int, structs: array<struct<_1:string,_2:string>>] 

scala> finalDF.show 
+---+------------------+ 
| id|   structs| 
+---+------------------+ 
| 1|[[v1,v2], [v5,v6]]| 
| 2|   [[v3,v4]]| 
+---+------------------+ 
+1

正確的,但我覺得'collect_list'在星火1.6.0才被推出,因此OP不能使用它(火花1.5.2)除非他們升級... –

+0

你是對的,拍攝 –

+1

是的,不幸的是我無法升級我的火花,所以我不能使用collect_list。 – alexgbelov