Window
函數應該只做部分技巧。其他部分的技巧可以通過定義udf
功能
def div = udf((age: Double, lag: Double) => lag/age)
首先要做,我們需要用Window
功能找到lag
,然後傳遞lag
和age
在udf
功能找到div
進口sqlContext.implicits._ 進口org.apache.spark.sql.functions._
val dataframe = Seq(
("A",100),
("A",50),
("A",20),
("A",4)
).toDF("person", "Age")
val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))
最後CAL的UDF功能
newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show
最後的結果將是
+------+---+
|person|div|
+------+---+
| A|2.0|
| A|2.5|
| A|5.0|
+------+---+
編輯 作爲@Jacek已經提出了一個更好的解決方案使用.na.drop
,而不是.filter(newDF("lag").isNotNull)
和使用/
運營商,所以我們甚至不需要調用udf
功能
newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show
是的。這是窗口聚合函數的「工作」。你用'/'嗎? –
@JacekLaskowski,我無法在'functions'中找到'/'或類似的東西。 – summerbulb