2017-08-02 79 views
2

我對Spark和Scala相對較新。我有一個具有以下格式的數據幀:使用Scala獲取Spark數據集中對應於最新時間戳的行

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS     | Col_7 | 

| 1234 | AAAA | 1111 | afsdf | ewqre | 1970-01-01 00:00:00.0 | false | 
| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true | 
| 1234 | AAAA | 1111 | dafsd | afwew | 2015-01-17 07:09:32.748 | false | 
| 5678 | BBBB | 2222 | afsdf | qwerq | 1970-01-01 00:00:00.0 | true | 
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04 | false | 
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0 | false | 

我需要做的是獲得對應於最新時間戳的行。 在上面的例子中,鍵是Col1,Col2和Col3。 Col_TS代表時間戳,Col_7是確定記錄有效性的布爾值。 我想要做的是找到一種方法來組合這些記錄基於密鑰,並保留具有最新時間戳的記錄。

所以在數據幀的操作的輸出上面應該是:

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS     | Col_7 | 

| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true | 
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04 | false | 
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0 | false | 

我想出了部分解決,但這種方式我只能返回其上記錄分組列項的數據幀而不是其他欄目。

df = df.groupBy("Col1","Col2","Col3").agg(max("Col_TS")) 


| Col1 | Col2 | Col3 | max(Col_TS)    | 

| 1234 | AAAA | 1111 | 2017-01-17 07:09:32.748 | 
| 5678 | BBBB | 2222 | 2016-12-08 07:58:43.04 | 
| 9101 | CCCC | 3333 | 1970-01-01 00:00:00.0 | 

有人可以幫我想出一個Scala代碼來執行此操作嗎?

回答

2

一種選擇是通過Col_TS首先命令該數據幀,然後按Col1中col2的COL3並且彼此柱採取的最後一個項目:

val val_columns = Seq("Col_4", "Col_5", "Col_TS", "Col_7").map(x => last(col(x)).alias(x)) 

(df.orderBy("Col_TS") 
    .groupBy("Col1", "Col2", "Col3") 
    .agg(val_columns.head, val_columns.tail: _*).show) 

+----+----+----+-----+-----+--------------------+-----+ 
|Col1|Col2|Col3|Col_4|Col_5|    Col_TS|Col_7| 
+----+----+----+-----+-----+--------------------+-----+ 
|1234|AAAA|1111|ewqrw|dafda|2017-01-17 07:09:...| true| 
|9101|CCCC|3333|caxad|fsdaa|1970-01-01 00:00:...|false| 
|5678|BBBB|2222|bafva|qweqe|2016-12-08 07:58:...|false| 
+----+----+----+-----+-----+--------------------+-----+ 
+0

謝謝你的回覆@Psidom。我得到了整體思路,但是能否描述'agg(val_columns.head,val_columns.tail:_ *)'部分是做什麼的? –

+1

'val_columns'是聚合表達式的一個序列,'agg(val_columns.head,val_columns.tail:_ *)'將'val_columns'中的每個元素作爲單獨的參數應用於'agg'函數,這相當於'agg last(col(「Col_4」)).alias(「Col_4」),last(col(「Col_5」)).alias(「Col_5」),... etc',但只是更短,更編程化 – Psidom

+0

非常好!如此簡潔,謝謝 –

4

你可以使用window功能如下

import org.apache.spark.sql.functions._ 
val windowSpec = Window.partitionBy("Col1","Col2","Col3").orderBy(col("Col_TS").desc) 

df.withColumn("maxTS", first("Col_TS").over(windowSpec)) 
.select("*").where(col("maxTS") === col("Col_TS")) 
.drop("maxTS") 
    .show(false) 

你應該得到o輸出如下

+----+----+----+-----+-----+----------------------+-----+ 
|Col1|Col2|Col3|Col_4|Col_5|Col_TS    |Col_7| 
+----+----+----+-----+-----+----------------------+-----+ 
|5678|BBBB|2222|bafva|qweqe|2016-12-0807:58:43.04 |false| 
|1234|AAAA|1111|ewqrw|dafda|2017-01-1707:09:32.748|true | 
|9101|CCCC|3333|caxad|fsdaa|1970-01-0100:00:00.0 |false| 
+----+----+----+-----+-----+----------------------+-----+