2017-06-16 38 views
0

我在Spark中有一個數據框,並且我在這個數據框上應用了一些過濾器。根據通過這些過濾器的記錄,我在數據框中添加了一個新列。請參閱下面的代碼:在Spark中簡化列

val rule_name = when(col("product_name").isin("N") && col("territory").isin("Metro","GA"), "rule1").otherwise(when(col("product_name").isin("XGE") && col("territory").isin("14721"), "rule2").otherwise("")) 
val df_final = df_join.withColumn("rule_name" , rule_name) 

爲了更好的可見性的目的,我想打破下面的代碼在多行:

val rule_name = when(col("product_name").isin("N") && col("territory").isin("Metro","GA"), "rule1").otherwise(when(col("product_name").isin("XGE") && col("territory").isin("14721"), "rule2").otherwise("")) 

有沒有辦法在Scala和pyspark實現這一目標的火花?

回答

0

您已經有了更好的解決方案。如果你想更好的可讀性,那麼你可以使用udf函數。但是udf函數需要將列序列化和反序列化。所以,你可以使用UDF功能

def rule_name = udf((product_name: String, territory: String) => { 
    val rule1ProductName = Array("N") 
    val rule2ProductName = Array("XGE") 
    val rule1Territory = Array("Metro", "GA") 
    val rule2Territory = Array("14721") 

    if(rule1ProductName.contains(product_name) && rule1Territory.contains(territory)) { 
    "rule1" 
    } 
    if(rule2ProductName.contains(product_name) && rule2Territory.contains(territory)) { 
    "rule2" 
    } 
    else "" 
}) 

可以調用UDF功能爲您提供

val df_final = df_join.withColumn("rule_name" , rule_name($"product_name", $"territory")) 
0

解決方案爲這個最優的:

val rule_name = when(col("product_name").isin("N") && col("territory").isin("Metro","GA"), "rule1").otherwise(when(col("product_name").isin("XGE") && col("territory").isin("14721"), "rule2").otherwise(""))