0

我有以下sparkdataframe:如何劃分範圍中的數字列併爲apache spark中的每個範圍分配標籤?

id weekly_sale 
1 40000 
2 120000 
3 135000 
4 211000 
5 215000 
6 331000 
7 337000 

我需要看到其中的weekly_sale列秋天以下間隔的項目:

under 100000 
between 100000 and 200000 
between 200000 and 300000 
more than 300000 

,所以我期望的輸出會是這樣:

id weekly_sale label 
1 40000  under 100000  
2 120000  between 100000 and 200000 
3 135000  between 100000 and 200000 
4 211000  between 200000 and 300000 
5 215000  between 200000 and 300000 
6 331000  more than 300000 
7 337000  more than 300000 

任何pyspark,spark.sql和Hive上下文實現都會幫助我。

+0

@ eliasah我會感謝您的幫助 – sanaz

+0

那麼你有沒有嘗試過什麼?有很多方法可以實現這些目標,而且沒有一個特別複雜。 – zero323

+0

@ zero323其實我只是想把DF註冊​​爲表格,然後用CASE WHEN ..你的想法是什麼? – sanaz

回答

4

假設範圍和標籤的定義如下:

splits = [float("-inf"), 100000.0, 200000.0, 300000.0, float("inf")] 
labels = [ 
    "under 100000", "between 100000 and 200000", 
    "between 200000 and 300000", "more than 300000"] 

df = sc.parallelize([ 
    (1, 40000.0), (2, 120000.0), (3, 135000.0), 
    (4, 211000.0), (5, 215000.0), (6, 331000.0), 
    (7, 337000.0) 
]).toDF(["id", "weekly_sale"]) 

一種可能的方法是使用Bucketizer

from pyspark.ml.feature import Bucketizer 
from pyspark.sql.functions import array, col, lit 

bucketizer = Bucketizer(
    splits=splits, inputCol="weekly_sale", outputCol="split" 
) 

with_split = bucketizer.transform(df) 

後來粘貼標籤:

label_array = array(*(lit(label) for label in labels)) 

with_split.withColumn(
    "label", label_array.getItem(col("split").cast("integer")) 
).show(10, False) 

## +---+-----------+-----+-------------------------+ 
## |id |weekly_sale|split|label     | 
## +---+-----------+-----+-------------------------+ 
## |1 |40000.0 |0.0 |under 100000    | 
## |2 |120000.0 |1.0 |between 100000 and 200000| 
## |3 |135000.0 |1.0 |between 100000 and 200000| 
## |4 |211000.0 |2.0 |between 200000 and 300000| 
## |5 |215000.0 |2.0 |between 200000 and 300000| 
## |6 |331000.0 |3.0 |more than 300000   | 
## |7 |337000.0 |3.0 |more than 300000   | 
## +---+-----------+-----+-------------------------+ 

有很多當然你可以通過不同的方式實現相同的目標。例如,可以創建一個查找表:

from toolz import sliding_window 
from pyspark.sql.functions import broadcast 

mapping = [ 
    (lower, upper, label) for ((lower, upper), label) 
    in zip(sliding_window(2, splits), labels) 
] 

lookup_df =sc.parallelize(mapping).toDF(["lower", "upper", "label"]) 

df.join(
    broadcast(lookup_df), 
    (col("weekly_sale") >= col("lower")) & (col("weekly_sale") < col("upper")) 
).drop("lower").drop("upper") 

或生成查找表達式:

from functools import reduce 
from pyspark.sql.functions import when 

def in_range(c): 
    def in_range_(acc, x):   
     lower, upper, label = x 
     return when(
      (c >= lit(lower)) & (c < lit(upper)), lit(label) 
     ).otherwise(acc) 
    return in_range_ 

label = reduce(in_range(col("weekly_sale")), mapping, lit(None)) 

df.withColumn("label", label) 

效率最低的方法是一個UDF。

+0

我要測試它,非常感謝,我試圖在其他功能,但似乎只有一個條件時,我怎麼能給它幾個條件? df_label = quantile_filtered.withColumn(「label」,func.when(quantile_filtered.total_sale <100000,「under 100000」)。否則(0)) – sanaz

+0

你如何評估這個:df_label = quantile_filtered.withColumn(「label」,func ((quantile_filtered.total_sale> = 3500)&(quantile_filtered.total_sale <3800),「3500至3800」)。((quantile_filtered.total_sale> = 3800 )&(quantile_filtered.total_sale <6000),「3800 to 6000」)when(quantile_filtered.total_sale> = 6000,「above 6000」)) – sanaz

+0

執行任何操作?例如,我的意思是「show」。 – zero323