2017-07-26 60 views
0

我在火花一個模式作爲如何在重新計算後替換spark數據框中的值?

root 
|-- atom: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- dailydata: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- datatimezone: string (nullable = true) 
| | | | |-- intervaltime: long (nullable = true) 
| | | | |-- intervalvalue: long (nullable = true) 
| | | | |-- utcacquisitiontime: string (nullable = true) 
| | |-- usage: string (nullable = true) 
| -- titlename: string (nullable = true) 

我已提取的utcacquisitiontime和從以上模式datatimezone如下

val result=q.selectExpr("explode(dailydata) as r").select("r.utcacquisitiontime","r.datatimezone") 

+--------------------+------------+ 
| utcacquisitiontime|datatimezone| 
+--------------------+------------+ 
|2017-03-27T22:00:00Z|  +02:00| 
|2017-03-27T22:15:00Z|  +02:00| 
|2017-03-27T22:30:00Z|  +02:00| 
|2017-03-27T22:45:00Z|  +02:00| 
|2017-03-27T23:00:00Z|  +02:00| 
|2017-03-27T23:15:00Z|  +02:00| 
|2017-03-27T23:30:00Z|  +02:00| 
|2017-03-27T23:45:00Z|  +02:00| 
|2017-03-28T00:00:00Z|  +02:00| 
|2017-03-28T00:15:00Z|  +02:00| 
|2017-03-28T00:30:00Z|  +02:00| 
|2017-03-28T00:45:00Z|  +02:00| 
|2017-03-28T01:00:00Z|  +02:00| 
|2017-03-28T01:15:00Z|  +02:00| 
|2017-03-28T01:30:00Z|  +02:00| 
|2017-03-28T01:45:00Z|  +02:00| 
|2017-03-28T02:00:00Z|  +02:00| 
|2017-03-28T02:15:00Z|  +02:00| 
|2017-03-28T02:30:00Z|  +02:00| 
|2017-03-28T02:45:00Z|  +02:00| 
+--------------------+------------+ 

我需要使用這兩個列計算localtime和由localtime替換它們經過計算。我應該如何計算localtime並替換它?

+3

可以使用'withColumn'方法上數據幀,並使用[火花功能](https://spark.apache.org/docs/2.0.2/api/java/o rg/apache/spark/sql/functions.html) –

回答

2

你可以依賴spark中的udf函數(User Defined Function)。另外在org.apache.sql.functions._中有很多已經預定義的函數可以幫助你。但這裏是你如何使這項工作

+-------------------+------------+ 
| utcacquisitiontime|datatimezone| 
+-------------------+------------+ 
|2017-03-27T22:00:00|  +02:00| 
+-------------------+------------+ 

請注意,我已經從時間列中刪除不必要的「Z」。 使用JodaTime依賴這樣定義一個UDF功能:

val toTimestamp = udf((time:String, zone:String) => { 
     val timezone = DateTimeZone.forID(zone) 
    val df = DateTimeFormat.forPattern("yyyy-mm-dd'T'HH:mm:ss") 
    new java.sql.Timestamp(df.withZone(timezone).parseDateTime(time).getMillis) 
    }) 

withColumn

df.withColumn("timestamp", toTimestamp(col("utcacquisitiontime"), col("datatimezone")) 

的結果顯示(注意,在該模式中的列時間戳類型的時間戳,所以你將它應用在列可以做在其上的日期操作)

+-------------------+------------+--------------------+ 
| utcacquisitiontime|datatimezone|   timestamp| 
+-------------------+------------+--------------------+ 
|2017-03-27T22:00:00|  +02:00|2017-01-27 22:00:...| 
+-------------------+------------+--------------------+ 

root 
|-- utcacquisitiontime: string (nullable = true) 
|-- datatimezone: string (nullable = true) 
|-- timestamp: timestamp (nullable = true) 
+0

謝謝!它幫助! – Ninja

+0

如何將'timestamp'追加到根?如果你能幫忙,那真的很感激。 – Ninja

+1

df.drop(「utcacquisitiontime」)。drop(「datatimezone」)。如果它對你有幫助,也請投票 – dumitru

0

您可以使用Joda Time API時間在DF列通過執行類似轉換爲本地時間,

def convertToLocal(str:String):String = new DateTime(str).toLocalDateTime().toString 

下一個導入SQL implicits通過,

import ss.implicits._ 

,其中SS是實例您的SparkSession。爲了utcacquisitiontime列localDateTime的每個元素進行轉換,做這樣的事情,

val df=result map(r=>(convertToLocal(r.getString(0)),r.getString(1))) 

df show 

讓我知道,如果這有助於。乾杯。

+0

感謝您的幫助! – Ninja

+0

你能不能幫助我如何追加這個新的df到原來的json模式,我有上面這樣'utcacquisitiontime' | 'datatimezone'被替換爲新的'localtimestamp'計算? – Ninja

相關問題