1
我即將開發一個函數,該函數使用spark sql爲每列執行操作。在這個函數中,我需要引用列名:Spark SQL以編程方式引用列
val input = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
下面的例子明確地通過'column
表示列工作正常。
val pre1_1 = input.groupBy('col1).agg(mean($"TARGET").alias("pre_col1"))
val pre2_1 = input.groupBy('col1, 'TARGET).agg(count("*")/input.filter('TARGET === 1).count alias ("pre2_col1"))
input.as('a)
.join(pre1_1.as('b), $"a.col1" === $"b.col1").drop($"b.col1")
.join(pre2_1.as('b), ($"a.col1" === $"b.col1") and ($"a.TARGET" === $"b.TARGET")).drop($"b.col1").drop($"b.TARGET").show
When referring to the columns programmatically they can no longer be resolved. When 2 joins are performed one after the other which worked fine for the code snippet above.
我可以觀察到,對於此代碼段的第一和初始的df
col1
從開始到結束移動。可能這是它不能再解決的原因。 但是到目前爲止,我無法弄清楚如何在只傳遞字符串時如何訪問列/如何正確引用函數中的名稱。
val pre1_1 = input.groupBy("col1").agg(mean('TARGET).alias("pre_" + "col1"))
val pre2_1 = input.groupBy("col1", "TARGET").agg(count("*")/input.filter('TARGET === 1).count alias ("pre2_" + "col1"))
input.join(pre1_1, input("col1") === pre1_1("col1")).drop(pre1_1("col1"))
.join(pre2_1, (input("col1") === pre2_1("col1")) and (input("TARGET") === pre2_1("TARGET"))).drop(pre2_1("col1")).drop(pre2_1("TARGET"))
以及像一個替代方法:
df.as('a)
.join(pre1_1.as('b), $"a.${col}" === $"b.${col}").drop($"b.${col}")
沒有成功,因爲$"a.${col}"
不再被解析爲a.Column
而是df("a.col1")
不存在。