1
我想編寫Spark UDAF,其中列的類型可以是任何具有定義在其上的Scala數字的列。我在互聯網上搜索過,但只找到像DoubleType,LongType這樣的具體類型的例子。這不可能嗎?但是,如何將UDAF與其他數值一起使用呢?Spark UDAF - 使用泛型作爲輸入類型?
我想編寫Spark UDAF,其中列的類型可以是任何具有定義在其上的Scala數字的列。我在互聯網上搜索過,但只找到像DoubleType,LongType這樣的具體類型的例子。這不可能嗎?但是,如何將UDAF與其他數值一起使用呢?Spark UDAF - 使用泛型作爲輸入類型?
爲簡單起見,我們假設您想定義一個自定義sum
。你必須提供輸入類型TypeTag
並使用Scala的反射來定義模式:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import scala.reflect.runtime.universe._
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
case class MySum [T : TypeTag](implicit n: Numeric[T])
extends UserDefinedAggregateFunction {
val dt = schemaFor[T].dataType
def inputSchema = new StructType().add("x", dt)
def bufferSchema = new StructType().add("x", dt)
def dataType = dt
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, n.zero)
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0)))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, n.plus(buffer1.getAs[T](0), buffer2.getAs[T](0)))
}
def evaluate(buffer: Row) = buffer.getAs[T](0)
}
定義爲上面我們可以創建實例處理特定類型的函數:
val sumOfLong = MySum[Long]
spark.range(10).select(sumOfLong($"id")).show
+---------+
|mysum(id)|
+---------+
| 45|
+---------+
注意:
爲了獲得與內置集合函數相同的靈活性,您必須定義您自己的AggregateFunction
,如ImperativeAggregate
或DeclarativeAggregate
。這是可能的,但它是一個內部API。