這可以使用窗口函數完成。
from pyspark.sql.window import Window
from pyspark.sql import Row, functions as W
def tm(str):
return datetime.strptime(str, "%Y/%m/%d %H:%M:%S")
#setup data
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ]
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2))
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3))
busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ]
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB"))
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC"))
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB"))
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC"))
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD"))
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE"))
#create RDD
userDf = sc.parallelize(userTime).toDF().alias("usertime")
busDf = sc.parallelize(busTime).toDF().alias("bustime")
joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
userDf.user,
userDf.time.alias("user_time"),
busDf.bus,
busDf.time.alias("bus_time"),
busDf.stop)
additional_cols = joinedDF.withColumn("bus_time_diff", abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time"))))
partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff")).alias("rank")).filter(col("rank") == 1)
additional_cols.show(20,False)
partDf.show(20,False)
輸出:
+----+---------------------+---+---------------------+----+-------------+
|user|user_time |bus|bus_time |stop|bus_time_diff|
+----+---------------------+---+---------------------+----+-------------+
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:40.0|sA |20 |
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:50.0|sB |10 |
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 11:59:40.0|sB |20 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 12:00:10.0|sE |10 |
+----+---------------------+---+---------------------+----+-------------+
+----+---------------------+---+---------------------+----+-------------+----+
|user|user_time |bus|bus_time |stop|bus_time_diff|rank|
+----+---------------------+---+---------------------+----+-------------+----+
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5 |1 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10 |1 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5 |1 |
+----+---------------------+---+---------------------+----+-------------+----+
這是非常不錯的,你要解決我的問題,非常感謝! – Finn
不客氣! –