2017-03-03 46 views
1

什麼是錯,此代碼試圖改變一個datetime列的一天pyspark變化一天datetime列

import pyspark 
import pyspark.sql.functions as sf 
import pyspark.sql.types as sparktypes 
import datetime 

sc = pyspark.SparkContext(appName="test") 
sqlcontext = pyspark.SQLContext(sc) 

rdd = sc.parallelize([('a',datetime.datetime(2014, 1, 9, 0, 0)), 
         ('b',datetime.datetime(2014, 1, 27, 0, 0)), 
         ('c',datetime.datetime(2014, 1, 31, 0, 0))]) 
testdf = sqlcontext.createDataFrame(rdd, ["id", "date"]) 

print(testdf.show()) 
print(testdf.printSchema()) 

給出了測試數據框:

+---+--------------------+ 
| id|    date| 
+---+--------------------+ 
| a|2014-01-09 00:00:...| 
| b|2014-01-27 00:00:...| 
| c|2014-01-31 00:00:...| 
+---+--------------------+ 


root 
|-- id: string (nullable = true) 
|-- date: timestamp (nullable = true) 

然後我定義一個UDF改變天日期欄:

def change_day_(date, day): 
    return date.replace(day=day) 

change_day = sf.udf(change_day_, sparktypes.TimestampType()) 
testdf.withColumn("PaidMonth", change_day(testdf.date, 1)).show(1) 

這就提出了一個錯誤:

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: 
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) 
    at py4j.Gateway.invoke(Gateway.java:274) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
+1

也許嘗試用'lit(1)'替換'1'(在調用'change_day')後,'從pyspark.sql.functions導入點亮'後執行'? –

+0

謝謝!這工作! – muon

+0

@ArthurTacca你能解釋一下爲什麼? – muon

回答

1

假設接收多個參數的udf接收多個。 「1」不是一列。

這意味着您可以執行以下操作之一。要麼讓它作爲意見建議欄:

testdf.withColumn("PaidMonth", change_day(testdf.date, lit(1))).show(1) 

亮(1)是那些

或使原有的函數返回一個高階函數的一列:

def change_day_(day): 
    return lambda date: date.replace(day=day) 

change_day = sf.udf(change_day_(1), sparktypes.TimestampType()) 
testdf.withColumn("PaidMonth", change_day(testdf.date)).show(1) 

這基本上創建了一個替換爲1的函數,因此可以接收一個整數。 udf將適用於單列。

+0

你的第二個解決方案的工作原理,但我不明白如何通過日期參數 – muon

+0

cahnge_day_函數創建一個函數。該函數使用change_day_中的day參數作爲常量。 –

+1

@muon換句話說,用'lit(1)'將第一個參數作爲參數傳遞給workers *上的'change_day' *。通過這個解決方案,'change_day _()'在驅動程序*上傳遞數字1 *,它返回一個函數,然後傳遞給工作人員。 (第二個函數包含數字1的隱藏副本,因此數字1仍然最終傳遞給所有工作人員。) –

0

感謝@ ArthurTacca的評論,關鍵是要使用pyspark.sql.functions.lit()功能如下:

testdf.withColumn("PaidMonth", change_day(testdf.date, sf.lit(1))).show() 

備選答案的歡迎!