2015-10-19 79 views
9

我有一個Spark SQL DataFrame與數據,我想要得到的是在給定的日期範圍內當前行之前的所有行。所以舉個例子,我希望在給定行之前的7天之後有所有的行。我想我需要使用Window Function比如:Spark窗口函數 - 範圍之間的日期

Window \ 
    .partitionBy('id') \ 
    .orderBy('start') 

問題在於這裏。我希望7天有rangeBetween,但Spark文檔中沒有任何內容可以找到。 Spark是否提供這樣的選擇?現在我只是讓所有的前行與:

.rowsBetween(-sys.maxsize, 0) 

,但想實現這樣的:

.rangeBetween("7 days", 0) 

如果有人可以幫助我在這一個,我會非常感激。提前致謝!

回答

21

據我所知,在Spark和Hive中都不可能直接存在。兩者都要求與RANGE一起使用的ORDER BY子句是數字。我發現的最接近的是轉換爲時間戳,並在秒鐘上運行。假設start列包含date類型:

from pyspark.sql import Row 

row = Row("id", "start", "some_value") 
df = sc.parallelize([ 
    row(1, "2015-01-01", 20.0), 
    row(1, "2015-01-06", 10.0), 
    row(1, "2015-01-07", 25.0), 
    row(1, "2015-01-12", 30.0), 
    row(2, "2015-01-01", 5.0), 
    row(2, "2015-01-03", 30.0), 
    row(2, "2015-02-01", 20.0) 
]).toDF().withColumn("start", col("start").cast("date")) 

一個小幫手和窗口定義:

from pyspark.sql.window import Window 
from pyspark.sql.functions import mean, col 


# Hive timestamp is interpreted as UNIX timestamp in seconds* 
days = lambda i: i * 86400 

最後查詢:

w = (Window() 
    .partitionBy(col("id")) 
    .orderBy(col("start").cast("timestamp").cast("long")) 
    .rangeBetween(-days(7), 0)) 

df.select(col("*"), mean("some_value").over(w).alias("mean")).show() 

## +---+----------+----------+------------------+ 
## | id|  start|some_value|    mean| 
## +---+----------+----------+------------------+ 
## | 1|2015-01-01|  20.0|    20.0| 
## | 1|2015-01-06|  10.0|    15.0| 
## | 1|2015-01-07|  25.0|18.333333333333332| 
## | 1|2015-01-12|  30.0|21.666666666666668| 
## | 2|2015-01-01|  5.0|    5.0| 
## | 2|2015-01-03|  30.0|    17.5| 
## | 2|2015-02-01|  20.0|    20.0| 
## +---+----------+----------+------------------+ 

遠離漂亮,但工程。


* Hive Language Manual, Types

+0

感謝,我想的是類似的東西,最好有它證實! – Nhor