我使用PySpark V1.6.1創建一個數據幀,我想用另外一個來創建一個數據幀:Pyspark:如何使用其他數據框
- 轉換已在不同的三個值中的一個結構體的列
- 從字符串轉換的時間戳DATATIME
- 使用時間戳
- 更改列名和類型
現在正在使用.map(func)創建一個使用該函數的RDD(它從原始類型的一行轉換並返回一個新行)。但是這是創建一個RDD,我不會這樣做。
有沒有更好的方法來做到這一點?
謝謝!
我使用PySpark V1.6.1創建一個數據幀,我想用另外一個來創建一個數據幀:Pyspark:如何使用其他數據框
現在正在使用.map(func)創建一個使用該函數的RDD(它從原始類型的一行轉換並返回一個新行)。但是這是創建一個RDD,我不會這樣做。
有沒有更好的方法來做到這一點?
謝謝!
希望這有助於!
from pyspark.sql.functions import unix_timestamp, col, to_date, struct
####
#sample data
####
df = sc.parallelize([[25, 'Prem', 'M', '12-21-2006 11:00:05','abc', '1'],
[20, 'Kate', 'F', '05-30-2007 10:05:00', 'asdf', '2'],
[40, 'Cheng', 'M', '12-30-2017 01:00:01', 'qwerty', '3']]).\
toDF(["age","name","sex","datetime_in_strFormat","initial_col_name","col_in_strFormat"])
#create 'struct' type column by combining first 3 columns of sample data - (this is built to answer query #1)
df = df.withColumn("struct_col", struct('age', 'name', 'sex')).\
drop('age', 'name', 'sex')
df.show()
df.printSchema()
####
#query 1
####
#Convert a field that has a struct of three values (i.e. 'struct_col') in different columns (i.e. 'name', 'age' & 'sex')
df = df.withColumn('name', col('struct_col.name')).\
withColumn('age', col('struct_col.age')).\
withColumn('sex', col('struct_col.sex')).\
drop('struct_col')
df.show()
df.printSchema()
####
#query 2
####
#Convert the timestamp from string (i.e. 'datetime_in_strFormat') to datetime (i.e. 'datetime_in_tsFormat')
df = df.withColumn('datetime_in_tsFormat',
unix_timestamp(col('datetime_in_strFormat'), 'MM-dd-yyyy hh:mm:ss').cast("timestamp"))
df.show()
df.printSchema()
####
#query 3
####
#create more columns using above timestamp (e.g. fetch date value from timestamp column)
df = df.withColumn('datetime_in_dateFormat', to_date(col('datetime_in_tsFormat')))
df.show()
####
#query 4.a
####
#Change column name (e.g. 'initial_col_name' is renamed to 'new_col_name)
df = df.withColumnRenamed('initial_col_name', 'new_col_name')
df.show()
####
#query 4.b
####
#Change column type (e.g. string type in 'col_in_strFormat' is coverted to double type in 'col_in_doubleFormat')
df = df.withColumn("col_in_doubleFormat", col('col_in_strFormat').cast("double"))
df.show()
df.printSchema()
的樣本數據:
+---------------------+----------------+----------------+------------+
|datetime_in_strFormat|initial_col_name|col_in_strFormat| struct_col|
+---------------------+----------------+----------------+------------+
| 12-21-2006 11:00:05| abc| 1| [25,Prem,M]|
| 05-30-2007 10:05:00| asdf| 2| [20,Kate,F]|
| 12-30-2017 01:00:01| qwerty| 3|[40,Cheng,M]|
+---------------------+----------------+----------------+------------+
root
|-- datetime_in_strFormat: string (nullable = true)
|-- initial_col_name: string (nullable = true)
|-- col_in_strFormat: string (nullable = true)
|-- struct_col: struct (nullable = false)
| |-- age: long (nullable = true)
| |-- name: string (nullable = true)
| |-- sex: string (nullable = true)
最終輸出數據:
+---------------------+------------+----------------+-----+---+---+--------------------+----------------------+-------------------+
|datetime_in_strFormat|new_col_name|col_in_strFormat| name|age|sex|datetime_in_tsFormat|datetime_in_dateFormat|col_in_doubleFormat|
+---------------------+------------+----------------+-----+---+---+--------------------+----------------------+-------------------+
| 12-21-2006 11:00:05| abc| 1| Prem| 25| M| 2006-12-21 11:00:05| 2006-12-21| 1.0|
| 05-30-2007 10:05:00| asdf| 2| Kate| 20| F| 2007-05-30 10:05:00| 2007-05-30| 2.0|
| 12-30-2017 01:00:01| qwerty| 3|Cheng| 40| M| 2017-12-30 01:00:01| 2017-12-30| 3.0|
+---------------------+------------+----------------+-----+---+---+--------------------+----------------------+-------------------+
root
|-- datetime_in_strFormat: string (nullable = true)
|-- new_col_name: string (nullable = true)
|-- col_in_strFormat: string (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- sex: string (nullable = true)
|-- datetime_in_tsFormat: timestamp (nullable = true)
|-- datetime_in_dateFormat: date (nullable = true)
|-- col_in_doubleFormat: double (nullable = true)
如果你已經能夠創建RDD,您可以輕鬆地將其轉化爲DF。 –
但我不想創建一個RDD,我想避免使用RDD,因爲它們是python的性能瓶頸,我只是想做DF轉換 – frm
請提供一些代碼,說明您已經嘗試過,以便我們可以提供幫助。同時,查看spark數據框的'pyspark.sql.functions.udf'和'withColumn()'方法。 – pault