2017-06-06 67 views
2

在Spark-Sql版本1.6中,使用DataFrame s,是否有一種方法可以針對特定列計算每行的除以當前行和下一個行的分數?如何將當前行的值與下列值相除?

例如,如果我有一列一個表,像這樣

Age 
100 
50 
20 
4 

我想下面的輸出

Franction 
2 
2.5 
5 

最後一行被丟棄,因爲它沒有「下一個行「添加到。

現在我正在通過對錶格進行排名並將其與自身結合起來,其中rank等於rank+1

有沒有更好的方式來做到這一點? 這可以用Window函數完成嗎?

+0

是的。這是窗口聚合函數的「工作」。你用'/'嗎? –

+0

@JacekLaskowski,我無法在'functions'中找到'/'或類似的東西。 – summerbulb

回答

2

Window函數應該只做部分技巧。其他部分的技巧可以通過定義udf功能

def div = udf((age: Double, lag: Double) => lag/age) 

首先要做,我們需要用Window功能找到lag,然後傳遞lagageudf功能找到div 進口sqlContext.implicits._ 進口org.apache.spark.sql.functions._

val dataframe = Seq(
    ("A",100), 
    ("A",50), 
    ("A",20), 
    ("A",4) 
).toDF("person", "Age") 

val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc) 
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec)) 

最後CAL的UDF功能

newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show 

最後的結果將是

+------+---+ 
|person|div| 
+------+---+ 
|  A|2.0| 
|  A|2.5| 
|  A|5.0| 
+------+---+ 

編輯 作爲@Jacek已經提出了一個更好的解決方案使用.na.drop,而不是.filter(newDF("lag").isNotNull)和使用/運營商,所以我們甚至不需要調用udf功能

newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show 
+2

謝謝。根據你的回答,我編寫了以下內容(無UDF):'dataframe.select($「person」,$「Age」/(lead(「Age」,1)over windowSpec)爲「div」)。 drop.show' – summerbulb

相關問題