2017-07-24 68 views
0

我具有以下數據集:火花定製聚合> = 2.0(階)

val myDS = List(("a",1,1.1), ("b",2,1.2), ("a",3,3.1), ("b",4,1.4), ("a",5,5.1)).toDS 
// and aggregation 
// myDS.groupByKey(t2 => t2._1).agg(myAvg).collect() 

欲編寫自定義集合函數myAvg這需要Tuple3參數和返回sum(_._2)/sum(_._3)。 我知道,它可以用其他方式計算,但我想編寫自定義聚合。

我寫了這樣的事情:

import org.apache.spark.sql.expressions.Aggregator 
    import org.apache.spark.sql.{Encoder, Encoders} 

    val myAvg = new Aggregator[Tuple3[String, Integer, Double], 
           Tuple2[Integer,Double], 
           Double] { 
     def zero: Tuple2[Integer,Double] = Tuple2(0,0.0) 
     def reduce(agg: Tuple2[Integer,Double], 
       a: Tuple3[String, Integer,Double]): Tuple2[Integer,Double] = 
           Tuple2(agg._1 + a._2, agg._2 + a._3) 
     def merge(agg1: Tuple2[Integer,Double], 
       agg2: Tuple2[Integer,Double]): Tuple2[Integer,Double] = 
           Tuple2(agg1._1 + agg2._1, agg1._2 + agg2._2) 
     def finish(res: Tuple2[Integer,Double]): Double = res._1/res._2 
     def bufferEncoder: Encoder[(Integer, Double)] = 
           Encoders.tuple(Encoders.INT, Encoders.scalaDouble) 
     def outputEncoder: Encoder[Double] = Encoders.scalaDouble 
    }.toColumn() 

不幸的是我收到以下錯誤:

java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit() 
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75) 
    at org.apache.spark.sql.functions$.lit(functions.scala:101) 
    at org.apache.spark.sql.Column.apply(Column.scala:217) 

有什麼不對?

在我的本地星火2.1我收到一個警告

warning: there was one deprecation warning; re-run with -deprecation for details 

什麼在我的代碼已經過時?

感謝您的任何建議。

+0

一分鐘後...但是,錯誤信息是正確的... 問題出在'column()'=>'column' – SmallerThan

回答

1

看來,這裏的問題是您使用Java的Integer,而不是Scala的Int的 - 如果你Int替換的Integer所有使用您的聚合執行(與Encoders.scalaInt取代Encoders.INT) - 這個按預期工作:

val myAvg: TypedColumn[(String, Int, Double), Double] = new Aggregator[(String, Int, Double), (Int, Double), Double] { 
    def zero: (Int, Double) = Tuple2(0,0.0) 
    def reduce(agg: (Int, Double), a: (String, Int, Double)): (Int, Double) = 
    (agg._1 + a._2, agg._2 + a._3) 
    def merge(agg1: (Int, Double), agg2: (Int, Double)): (Int, Double) = 
    (agg1._1 + agg2._1, agg1._2 + agg2._2) 
    def finish(res: (Int, Double)): Double = res._1/res._2 
    def bufferEncoder: Encoder[(Int, Double)] = 
    Encoders.tuple(Encoders.scalaInt, Encoders.scalaDouble) 
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble 
}.toColumn 

(也應用了一些語法糖,刪除了明確的Tuble引用)。