2016-07-28 71 views
5

我做了一個簡單的UDF轉換或在火花temptabl提取時間字段中的值。我註冊函數,但是當我使用sql調用函數時,它會拋出一個NullPointerException異常。以下是我的功能和執行它的過程。我正在使用齊柏林飛艇。這件事昨天正在起作用,但今天早上停止了工作。Scala和星火UDF功能

功能

def convert(time:String) : String = { 
    val sdf = new java.text.SimpleDateFormat("HH:mm") 
    val time1 = sdf.parse(time) 
    return sdf.format(time1) 
} 

註冊功能

sqlContext.udf.register("convert",convert _) 

測試功能,無需SQL - 這工作

convert(12:12:12) -> returns 12:12 

試驗飛艇與SQL函數失敗。的不是Temptable

root 
|-- date: string (nullable = true) 
|-- time: string (nullable = true) 
|-- serverip: string (nullable = true) 
|-- request: string (nullable = true) 
|-- resource: string (nullable = true) 
|-- protocol: integer (nullable = true) 
|-- sourceip: string (nullable = true) 

,我得到堆棧跟蹤的部分

%sql 
select convert(time) from temptable limit 10 

結構。

java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) 
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 

回答

7

使用UDF代替定義一個函數直接

import org.apache.spark.sql.functions._ 

val convert = udf[String, String](time => { 
     val sdf = new java.text.SimpleDateFormat("HH:mm") 
     val time1 = sdf.parse(time) 
     sdf.format(time1) 
    } 
) 

一個UDF的輸入參數是列(或列)。而返回類型是Column。

case class UserDefinedFunction protected[sql] (
    f: AnyRef, 
    dataType: DataType, 
    inputTypes: Option[Seq[DataType]]) { 

    def apply(exprs: Column*): Column = { 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) 
    } 
}