2017-08-16 80 views
2

我想使用一個特定的UDF使用Spark星火如何使用UDF與加入

這裏的計劃:

我有一個table A(10萬行)和table B(15百萬行)

我想用table A和的table B 之一的UDF比較一個元素是有可能

下面是我的代碼示例。在某些時候,我還需要說我UDF比較必須大於0,9更大:

DataFrame dfr = df 
       .select("name", "firstname", "adress1", "city1","compare(adress1,adress2)") 
       .join(dfa,df.col("adress1").equalTo(dfa.col("adress2")) 
         .and((df.col("city1").equalTo(dfa.col("city2")) 
           ...; 

這可能嗎?

回答

2

是的,可以。但是它會比正常的運營速度較慢,如星火將不能做謂語下推

例子:

val udf = udf((x : String, y : String) => { here compute similarity; }); 
val df3 = df1.join(df2, udf(df1.field1, df2.field1) > 0.9) 

例如:

val df1 = Seq (1, 2, 3, 4).toDF("x") 
val df2 = Seq(1, 3, 7, 11).toDF("q") 
val udf = org.apache.spark.sql.functions.udf((x : Int, q : Int) => { Math.abs(x - q); }); 
val df3 = df1.join(df2, udf(df1("x"), df2("q")) > 1) 

您也可以直接從用戶返回布爾定義函數