火花2.0+,您可以使用collect_list
來實現:
scala> val df = sc.parallelize(Seq((1, ("v1", "v2")), (2, ("v3", "v4")), (1, ("v5", "v6")))).toDF("id", "myStruct")
df: org.apache.spark.sql.DataFrame = [id: int, myStruct: struct<_1: string, _2: string>]
scala> df.show
+---+--------+
| id|myStruct|
+---+--------+
| 1| [v1,v2]|
| 2| [v3,v4]|
| 1| [v5,v6]|
+---+--------+
scala> df.groupBy("id").agg(collect_list($"myStruct")).show
+---+----------------------+
| id|collect_list(myStruct)|
+---+----------------------+
| 1| [[v1,v2], [v5,v6]]|
| 2| [[v3,v4]]|
+---+----------------------+
然而,在星火1.5.2你需要像這樣:
scala> val df2 = df.select($"id", $"myStruct._1".as("p1"), $"myStruct._2".as("p2"))
df2: org.apache.spark.sql.DataFrame = [id: int, p1: string, p2: string]
scala> df2.show
+---+---+---+
| id| p1| p2|
+---+---+---+
| 1| v1| v2|
| 2| v3| v4|
| 1| v5| v6|
+---+---+---+
scala> val rdd = df2.rdd.map{case Row(id: Int, p1: String, p2: String) => (id, (p1, p2))}
rdd: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[47] at map at <console>:32
scala> val finalDF = rdd.groupByKey.map(x => (x._1, x._2.toList)).toDF("id", "structs")
finalDF: org.apache.spark.sql.DataFrame = [id: int, structs: array<struct<_1:string,_2:string>>]
scala> finalDF.show
+---+------------------+
| id| structs|
+---+------------------+
| 1|[[v1,v2], [v5,v6]]|
| 2| [[v3,v4]]|
+---+------------------+
正確的,但我覺得'collect_list'在星火1.6.0才被推出,因此OP不能使用它(火花1.5.2)除非他們升級... –
你是對的,拍攝 –
是的,不幸的是我無法升級我的火花,所以我不能使用collect_list。 – alexgbelov