2017-08-24 96 views
2

我試圖在spark中實現UDF;可以將文字和列作爲參數。爲了達到這個目的,我相信我可以使用咖喱UDF。Curried UDF - Pyspark

該函數用於將字符串文字與DataFrame列中的每個值進行匹配。我總結了下面的代碼: -

def matching(match_string_1): 
    def matching_inner(match_string_2): 
     return difflib.SequenceMatcher(None, match_string_1, match_string_2).ratio() 
    return matching 

hc.udf.register("matching", matching) 
matching_udf = F.udf(matching, StringType()) 

df_matched = df.withColumn("matching_score", matching_udf(lit("match_string"))(df.column)) 
  • "match_string"實際上是分配給我遍歷一個列表中的值。

不幸的是,這並不是我所希望的;我正在收到

"TypeError: 'Column' object is not callable".

我相信我沒有正確調用此函數。

回答

2

應該是這樣的:

def matching(match_string_1): 
    def matching_inner(match_string_2): 
     return difflib.SequenceMatcher(
      a=match_string_1, b=match_string_2).ratio() 

    # Here create udf. 
    return F.udf(matching_inner, StringType()) 

df.withColumn("matching_score", matching("match_string")(df.column)) 

如果你想支持Column論據match_string_1你必須重寫它是這樣的:

def matching(match_string_1): 
    def matching_inner(match_string_2): 
     return F.udf(
      lambda a, b: difflib.SequenceMatcher(a=a, b=b).ratio(), 
      StringType())(match_string_1, match_string_2) 

    return matching_inner 

df.withColumn("matching_score", matching(F.lit("match_string"))(df.column) 

您當前的代碼不工作,matching_udf是UDF和matching_udf(lit("match_string"))創建一個Column表達式而不是調用內部函數。