2017-08-02 169 views
0

我正在嘗試在註冊它之後調用使用callUDF的udf。但是,函數validateNumber()沒有被調用。無法使用callUDF調用UDF() - Spark Java

代碼如下所示:

public Dataset<Row> sampleCallUdf(Dataset<Row> dataset) { 

    UDF2<Long, Long, String> validateNumber = (UDF2<Long, Long, String>) SampleClass::validateNumber; 
    UDFRegistration udfRegister = CONFIG.getSparkSession().udf(); 
    udfRegister.register("validateNumber", validateNumber, DataTypes.StringType); 

    return dataset.withColumn("rejection_reason", 
        coalesce(
          callUDF("validateNumber", column("cookie"), column("session")))); 
    } 

    public static String validateNumber(Long cookie, Long session) { 
      System.out.println("Into validateNumber function"); 
      if(cookie != 0){ 
      return "correct"; 
      }else{ 
      return "incorrect"; 
      } 
    } 

輸入我想的是:

Dataset<Row> input = spark().createDataFrame(Arrays.asList(
       RowFactory.create("28/05/2017 00:12:34", 0L, -2864001245604480000L, "abc" ,"90.202.190.106", 123, "abc", "xyz", "mno"), 
       RowFactory.create("28/05/2017 00:12:34", 2345678L, 2864001245604480000L, "abc" ,"90.202.190.106", 123, "abc", "xyz", "mno")), TEMP_TABLE); 

的問題是,它甚至沒有打印在validateNumber()函數系統輸出語句。

+0

對我來說工作很好。你可以檢查數據集中的值嗎? – abaghel

+0

@abaghel - 它是否進入validateNumber()? – anukuls

+0

或者,如果您可以讓我知道您使用了什麼輸入。 – anukuls

回答

0

請在下面找到示例程序。

public class SparkUDF { 
    public static void main(String[] args) throws Exception { 
    SparkSession spark = SparkSession 
      .builder() 
      .appName("SparkUDF") 
      .master("local[*]") 
      .getOrCreate(); 
    //data 
    List<Tuple2<Long, Long>> inputList = new ArrayList<Tuple2<Long, Long>>(); 
    inputList.add(new Tuple2<Long, Long>(111l, 10011l)); 
    inputList.add(new Tuple2<Long, Long>(0l, 20022l)); 
    //Dataset 
    Dataset<Row> ds = spark.createDataset(inputList, Encoders.tuple(Encoders.LONG(), Encoders.LONG())).toDF("cookie", "session"); 
    //udf 
    UDF2<Long, Long, String> validateNumber = (UDF2<Long, Long, String>) SparkUDF::validateNumber; 
    spark.udf().register("validateNumber", validateNumber, DataTypes.StringType); 
    Dataset<Row> ds1 = ds.withColumn("rejection_reason",coalesce(callUDF("validateNumber", col("cookie"), col("session")))); 
    ds1.show(); 
    spark.stop(); 
} 

public static String validateNumber(Long cookie, Long session) { 
    if (cookie != 0) { 
     return "correct"; 
    } else { 
     return "incorrect"; 
    } 
    } 
} 

您將獲得如下輸出。

+------+-------+----------------+ 
|cookie|session|rejection_reason| 
+------+-------+----------------+ 
| 111| 10011|   correct| 
|  0| 20022|  incorrect| 
+------+-------+----------------+