2

我想計算Spark數據框上的組分位數(使用PySpark)。無論是近似還是精確的結果都可以。我更喜歡在groupBy/agg的上下文中使用的解決方案,以便我可以將其與其他PySpark聚合函數混合使用。如果由於某種原因無法實現,則採用不同的方法也可以。PySpark組中的中位數/分位數通過

This question是相關的,但並不指示如何使用approxQuantile作爲聚合函數。

我也有權訪問percentile_approx Hive UDF,但我不知道如何將它用作聚合函數。

對於特異性起見,假設我有以下數據框:

from pyspark import SparkContext 
import pyspark.sql.functions as f 

sc = SparkContext()  

df = sc.parallelize([ 
    ['A', 1], 
    ['A', 2], 
    ['A', 3], 
    ['B', 4], 
    ['B', 5], 
    ['B', 6], 
]).toDF(('grp', 'val')) 

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val')) 
df_grp.show() 

預期的結果是:

+----+-------+ 
| grp|med_val| 
+----+-------+ 
| A|  2| 
| B|  5| 
+----+-------+ 
+0

請提供一個明確的例子,說明你想要達到的目標以及一些示例數據 - 不清楚爲什麼鏈接的答案不適用於你的案例 – desertnaut

+0

簡短的回答是,問題和答案都不使用單詞「組」或「聚合」。但我會按照你的建議更新這個問題。 – abeboparebop

+0

我認爲你可以在這個實例中使用基礎rdd和算法來計算分佈式分位數,例如[這裏](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest)及其中的鏈接。事實上,他們鏈接到的github有一些pyspark的例子。 – ags29

回答

4

既然你有機會獲得percentile_approx,一個簡單的解決辦法是將在SQL命令中使用它:

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df.registerTempTable("df") 
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp") 
+0

這有效,但我更喜歡在PySpark級別的'groupBy' /'agg'中使用的解決方案(以便我可以輕鬆地將它與其他PySpark聚合函數混合使用)。 – abeboparebop

+0

@abeboparebop我不相信它可能只使用'groupBy'和'agg',但是,使用基於窗口的方法也應該可行。 – Shaido

+1

我已經澄清了我在問題中的理想解決方案。很顯然,這個答案能完成這項工作,但這並不是我想要的。我會留下一段時間的問題,看看是否有更清晰的答案。 – abeboparebop

4

不幸的是,就我所知,似乎用「純粹的」PySpark命令(Shaido的解決方案提供了SQL的解決方法)來做到這一點是不可能的,原因很簡單:在與其他集合函數相比,如mean,approxQuantile不返回Column類型,而是列表

讓我們看看你的樣本數據一個簡單的例子:

spark.version 
# u'2.2.0' 

import pyspark.sql.functions as func 
from pyspark.sql import DataFrameStatFunctions as statFunc 

# aggregate with mean works OK: 
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val')) 
df_grp_mean.show() 
# +---+--------+ 
# |grp|mean_val| 
# +---+--------+ 
# | B|  5.0| 
# | A|  2.0| 
# +---+--------+ 

# try aggregating by median: 
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# AssertionError: all exprs should be Column 

# mean aggregation is a Column, but median is a list: 

type(func.mean(df['val'])) 
# pyspark.sql.column.Column 

type(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# list 

我懷疑,基於窗口的方法將使任何區別,因爲正如我所說的根本原因是一個非常基本的一個。

另請參閱my answer here瞭解更多詳情。