-1

我有這個json數據,我想在總結'b''&'a'中的數據時每小時收集'時間戳'列。pyspark更改在該列上使用groupby之前的列的值

{"a":1 , "b":1, "timestamp":"2017-01-26T01:14:55.719214Z"} 
{"a":1 , "b":1,"timestamp":"2017-01-26T01:14:55.719214Z"} 
{"a":1 , "b":1,"timestamp":"2017-01-26T02:14:55.719214Z"} 
{"a":1 , "b":1,"timestamp":"2017-01-26T03:14:55.719214Z"} 

這是我想

{"a":2 , "b":2, "timestamp":"2017-01-26T01:00:00"} 
{"a":1 , "b":1,"timestamp":"2017-01-26T02:00:00"} 
{"a":1 , "b":1,"timestamp":"2017-01-26T03:00:00"} 

這最後的輸出是我至今寫

df = spark.read.json(inputfile) 
df2 = df.groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"]) 

但我應如何改變「時間戳」列的前值使用groupby函數?提前致謝!

+1

此[答案](http://stackoverflow.com/a/34232633/2708667)可以是有幫助的。它顯示瞭如何四捨五入解析時間戳記對象。 – santon

回答

1
from pyspark.sql import functions as f 

df = spark.read.load(path='file:///home/zht/PycharmProjects/test/disk_file', format='json') 
df = df.withColumn('ts', f.to_utc_timestamp(df['timestamp'], 'EST')) 
win = f.window(df['ts'], windowDuration='1 hour') 
df = df.groupBy(win).agg(f.sum(df['a']).alias('sumA'), f.sum(df['b']).alias('sumB')) 
res = df.select(df['window']['start'].alias('start_time'), df['sumA'], df['sumB']) 
res.show(truncate=False) 

# output: 
+---------------------+----+----+            
|start_time   |sumA|sumB| 
+---------------------+----+----+ 
|2017-01-26 15:00:00.0|1 |1 | 
|2017-01-26 16:00:00.0|1 |1 | 
|2017-01-26 14:00:00.0|2 |2 | 
+---------------------+----+----+ 

f.window靈活得多

+0

感謝您的回答,其實我只需要在時間戳列中'2017-01-26 15:00:00',而不是'[2017-01-26 15:00:00.0,2017-01-26 16:00:00.0 ]」。你知道我怎麼得到這個? – gashu

+0

我更新了代碼 –

+0

感謝您的回覆。它工作正常:) – gashu

1

我想這是要做到這一點

df2 = df.withColumn("r_timestamp",df["r_timestamp"].substr(0,12)).groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"]) 

有沒有更好的解決方案,以獲得需要的格式的時間戳的一種方式?