13
使用Spark的DataFrame時,需要用戶定義的函數(UDF)將數據映射到列中。 UDF需要明確指定參數類型。在我的情況下,我需要操縱由對象數組組成的列,而我不知道使用哪種類型。這裏有一個例子:定義接受Spark DataFrame中的對象數組的UDF?
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
這是比較簡單的使用對數據內置org.apache.spark.sql.functions
進行基本操作在列
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
,它是一般容易編寫自定義的UDF執行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
但是如果我想使用UDF來操作「主題」列中的對象數組該怎麼辦?我在UDF中使用什麼類型的參數?例如,如果我想重新實現大小函數,而不是使用由火花所提供的一個:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
顯然Array[Something]
沒有工作...我應該使用什麼類型的!?我應該完全拋棄Array[]
嗎?戳到我告訴我scala.collection.mutable.WrappedArray
可能與它有關,但仍然有另一種類型,我需要提供。
我得到這個: java.lang.UnsupportedOperationException:不支持 在org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection模式類型org.apache.spark.sql.Row。在org.apache.spark.sql.functions $ .udf(functions.scala:3076) 。org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:671) 。 .. 134 elided –
@GuruprasadGV UDF應該返回'struct'的'Product'('TupleN',case class)。 – zero323