2016-04-22 164 views
0

一個數組Pyspark DF列這是我的pyspark數據框模式:更新基於另一列

root 
|-- user: string (nullable = true) 
|-- table: string (nullable = true) 
|-- changeDate: string (nullable = true) 
|-- fieldList: string (nullable = true) 
|-- id: string (nullable = true) 
|-- value2: integer (nullable = false) 
|-- value: double (nullable = false) 
|-- name: string (nullable = false) 
|-- temp: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- num_cols_changed: integer (nullable = true) 

在數據幀中的數據:

+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+ 
| user|table|   changeDate|   fieldList|  id|value2|value|name|    temp|num_cols_changed| 
+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+ 
| user11 | TAB1| 2016-01-24 19:10...|   value2 = 100|555555| 200| 0.5| old|  [value2 = 100]|    1| 
| user01 | TAB1| 2015-12-31 13:12...|value = 0.34,name=new| 1111| 200| 0.5| old|[value = 0.34, n...|    2| 
+--------+-----+--------------------+--------------------+------+------+-----+----+--------------------+----------------+ 

我想讀的臨時數組列,並基於這些值,我想更改數據框中的列。例如,第一行只有一列被更改,即value 2,所以我想用新的值100更新列df.value2。同樣,在下一行中,更改了2列,所以我需要提取值和名稱與他們的值並更新數據框中的適當列。所以輸出應該是這樣的:

+--------+-----+--------------------+------+------+-----+----+ 
| user|table|   changeDate| id|value2|value|name| 
+--------+-----+--------------------+------+------+-----+----+ 
| user11 | TAB1| 2016-01-24 19:10...|555555| 100| 0.5| old| 
| user01 | TAB1| 2015-12-31 13:12...| 1111| 200| 0.34| new| 
+--------+-----+--------------------+------+------+-----+----+ 

我想記住程序的性能,因此在僅僅使用dataframes方式聚焦,但如果沒有選擇我可以去RDD路線了。 基本上,我不知道如何在一行中處理多個值然後進行比較。我知道我可以使用column in df.columns來比較列名,但是對於使用數組的每一行這樣做會讓我感到困惑。任何幫助或新想法表示讚賞。

回答

0

這是我如何解決這個使用explode

df = df.withColumn('temp', split(df.fieldList, ',')) 
df = df.withColumn('cols', explode(df.temp)) 
df = df.withColumn('col_value', split(df.cols, '=')) 
df = df.withColumn('deltaCol', df.col_value[0]) 
     .withColumn('deltaValue',df.col_value[1]) 

上述的最終輸出(下降無關列後),因此本:

+------+-----+--------+--------------------+--------+----------+ 
| id|table| user|   changeDate|deltaCol|deltaValue| 
+------+-----+--------+--------------------+--------+----------+ 
|555555| TAB2| user11 | 2016-01-24 19:10...| value2 |  100| 
| 1111| TAB1| user01 | 2015-12-31 13:12...| value |  0.34| 
| 1111| TAB1| user01 | 2015-12-31 13:12...| name | 'newName'| 
+------+-----+--------+--------------------+--------+----------+ 

這個我把它註冊爲一個表格之後並執行SQL操作以轉發數據:

>>> res = sqlContext.sql("select id, table, user, changeDate, max(value2) as value2, max(value) as value, max(name) as name \ 
... from (select id, table, user, changeDate, case when trim(deltaCol) == 'value2' then deltaValue else Null end value2,\ 
... case when trim(deltaCol) == 'value' then deltaValue else Null end value,\ 
... case when trim(deltaCol) == 'name' then deltaValue else Null end name from delta) t group by id, table, user, changeDate") 

這樣的結果是:

+------+-----+--------+--------------------+------+-----+----------+ 
| id|table| user|   changeDate|value2|value|  name| 
+------+-----+--------+--------------------+------+-----+----------+ 
|555555| TAB2| user11 | 2016-01-24 19:10...| 100| null|  null| 
| 1111| TAB1| user01 | 2015-12-31 13:12...| null| 0.34| 'newName'| 
+------+-----+--------+--------------------+------+-----+----------+ 

對於這個碼與不同的表的使用,I使用的主DF(我的最終目標表)的列,以製備列的字符串:

>>> string = [(", max(" + c + ") as " + c) for c in masterDF.columns] 
>>> string = "".join(string) 
>>> string 
', max(id) as id, max(value) as value, max(name) as name, max(value2) as value2'