2016-08-17 77 views
13

使用Spark的DataFrame時,需要用戶定義的函數(UDF)將數據映射到列中。 UDF需要明確指定參數類型。在我的情況下,我需要操縱由對象數組組成的列,而我不知道使用哪種類型。這裏有一個例子:定義接受Spark DataFrame中的對象數組的UDF?

import sqlContext.implicits._ 

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects 
val data = sqlContext.read.json(sc.parallelize(Seq(
    """ 
    |{ 
    | "topic" : "pets", 
    | "subjects" : [ 
    | {"type" : "cat", "score" : 10}, 
    | {"type" : "dog", "score" : 1} 
    | ] 
    |} 
    """))) 

這是比較簡單的使用對數據內置org.apache.spark.sql.functions進行基本操作在列

import org.apache.spark.sql.functions.size 
data.select($"topic", size($"subjects")).show 

+-----+--------------+ 
|topic|size(subjects)| 
+-----+--------------+ 
| pets|    2| 
+-----+--------------+ 

,它是一般容易編寫自定義的UDF執行任意操作

import org.apache.spark.sql.functions.udf 
val enhance = udf { topic : String => topic.toUpperCase() } 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+ 
|UDF(topic)|size(subjects)| 
+----------+--------------+ 
|  PETS|    2| 
+----------+--------------+ 

但是如果我想使用UDF來操作「主題」列中的對象數組該怎麼辦?我在UDF中使用什麼類型的參數?例如,如果我想重新實現大小函數,而不是使用由火花所提供的一個:

val my_size = udf { subjects: Array[Something] => subjects.size } 
data.select($"topic", my_size($"subjects")).show 

顯然Array[Something]沒有工作...我應該使用什麼類型的!?我應該完全拋棄Array[]嗎?戳到我告訴我scala.collection.mutable.WrappedArray可能與它有關,但仍然有另一種類型,我需要提供。

回答

16

什麼你要找的是Seq[o.a.s.sql.Row]

import org.apache.spark.sql.Row 

val my_size = udf { subjects: Seq[Row] => subjects.size } 

說明

  • 當前的ArrayType表示是,因爲你已經知道了,所以WrappedArrayArray不工作,它最好保持安全。
  • StructType的本地類型是。不幸的是,這意味着訪問單個字段是不安全的。

注意

  • 要創建struct功能傳遞給udf必須返回Product類型(Tuple*case class),不。
+0

我得到這個: java.lang.UnsupportedOperationException:不支持 在org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection模式類型org.apache.spark.sql.Row。在org.apache.spark.sql.functions $ .udf(functions.scala:3076) 。org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:671) 。 .. 134 elided –

+0

@GuruprasadGV UDF應該返回'struct'的'Product'('TupleN',case class)。 – zero323