1

也許這只是因爲我對API相對來說比較陌生,但我覺得Spark ML方法通常會返回不必要的難以處理的DF。如何壓扁結構數組的列(由Spark ML API返回)?

這一次,這是讓我絆倒的ALS模型。具體來說,就是recommendForAllUsers方法。讓我們重建DF的類型,它將返回:

scala> val arrayType = ArrayType(new StructType().add("itemId", IntegerType).add("rating", FloatType)) 

scala> val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))). 
    toDF("userId", "recommendations"). 
    select($"userId", $"recommendations".cast(arrayType)) 

scala> recs.show() 
+------+------------------+ 
|userId| recommendations| 
+------+------------------+ 
|  1|[[1,0.7], [2,0.5]]| 
|  2|[[0,0.9], [4,0.1]]| 
+------+------------------+ 

scala> recs.printSchema 
root 
|-- userId: integer (nullable = false) 
|-- recommendations: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- itemId: integer (nullable = true) 
| | |-- rating: float (nullable = true) 

現在,我只關心在recommendationsitemId。畢竟,方法是recommendForAllUsers不是recommendAndScoreForAllUsers(好吧好吧我會停止時髦...)

我該怎麼做?

我原本以爲它,當我創建了一個UDF:

scala> val itemIds = udf((arr: Array[(Int, Float)]) => arr.map(_._1)) 

但產生一個錯誤:

scala> recs.withColumn("items", items($"recommendations")) 
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(recommendations)' due to data type mismatch: argument 1 requires array<struct<_1:int,_2:float>> type, however, '`recommendations`' is of array<struct<itemId:int,rating:float>> type.;; 
'Project [userId#87, recommendations#92, UDF(recommendations#92) AS items#238] 
+- Project [userId#87, cast(recommendations#88 as array<struct<itemId:int,rating:float>>) AS recommendations#92] 
    +- Project [_1#84 AS userId#87, _2#85 AS recommendations#88] 
     +- LocalRelation [_1#84, _2#85] 

任何想法?謝謝!

回答

2

哇,我的同事想出了一個非常優雅的解決方案:

scala> recs.select($"userId", $"recommendations.itemId").show 
+------+------+ 
|userId|itemId| 
+------+------+ 
|  1|[1, 2]| 
|  2|[0, 4]| 
+------+------+ 

所以也許星火ML API畢竟不是那麼困難:)

0

使用數組作爲列的類型,例如recommendations,使用explode函數(或更先進的flatMap操作符)可以提高生產率。

explode(e: Column): Column Creates a new row for each element in the given array or map column.

這給你一個合適的結構。

import org.apache.spark.sql.types._ 
val structType = new StructType(). 
    add($"itemId".int). 
    add($"rating".float) 
val arrayType = ArrayType(structType) 
val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))). 
    toDF("userId", "recommendations"). 
    select($"userId", $"recommendations" cast arrayType) 

val exploded = recs.withColumn("recs", explode($"recommendations")) 
scala> exploded.show 
+------+------------------+-------+ 
|userId| recommendations| recs| 
+------+------------------+-------+ 
|  1|[[1,0.7], [2,0.5]]|[1,0.7]| 
|  1|[[1,0.7], [2,0.5]]|[2,0.5]| 
|  2|[[0,0.9], [4,0.1]]|[0,0.9]| 
|  2|[[0,0.9], [4,0.1]]|[4,0.1]| 
+------+------------------+-------+ 

結構與*(星)在select運營商很高興他們拉平到每場結構列。你可以做select($"element.*")

scala> exploded.select("userId", "recs.*").show 
+------+------+------+ 
|userId|itemId|rating| 
+------+------+------+ 
|  1|  1| 0.7| 
|  1|  2| 0.5| 
|  2|  0| 0.9| 
|  2|  4| 0.1| 
+------+------+------+ 

我認爲這可以做你以後的事情。


p.s.儘可能遠離UDF,因爲它們會觸發從內部格式(InternalRow)到可導致GC過多的JVM對象的行轉換。