2016-07-27 74 views
0

兩個表之間的最近一次,我用pyspark和我有兩個dataframes這樣的:找到火花

user   time   bus 
A 2016/07/18 12:00:00 1 
B 2016/07/19 12:00:00 2 
C 2016/07/20 12:00:00 3 

bus   time   stop 
1 2016/07/18 11:59:40 sA 
1 2016/07/18 11:59:50 sB 
1 2016/07/18 12:00:05 sC 
2 2016/07/19 11:59:40 sB 
2 2016/07/19 12:00:10 sC 
3 2016/07/20 11:59:55 sD 
3 2016/07/20 12:00:10 sE 

現在我想知道在哪站根據總線編號和最接近用戶報告時間在第二張桌子。

例如,在表1中,用戶A在2016/07/18 12:00:00報告,他在1號公共汽車上,根據第二張表,公交車1號,但最近的時間是2016/07/18 12:00:05(第三個記錄),所以用戶現在在sC中。

所需的輸出應該是這樣的:

user   time   bus stop 
A 2016/07/18 12:00:00 1 sC 
B 2016/07/19 12:00:00 2 sC 
C 2016/07/20 12:00:00 3 sD 

我已轉一次進入時間戳,這樣唯一的問題是找到最接近的時間戳,其中公交車數量爲eqaul。

因爲我現在不熟悉sql,所以試圖使用map函數來查找最近的時間和停止點,這意味着我必須在map函數中使用sqlContext.sql,並且spark似乎並不是允許這樣做:

異常:您似乎試圖從廣播變量,操作或轉換中引用SparkContext。 SparkContext只能在驅動程序上使用,而不能在其上運行的代碼中使用。有關更多信息,請參閱SPARK-5063。

那麼如何寫一個SQL查詢來獲得正確的輸出?

回答

3

這可以使用窗口函數完成。

from pyspark.sql.window import Window 
from pyspark.sql import Row, functions as W 

def tm(str): 
    return datetime.strptime(str, "%Y/%m/%d %H:%M:%S") 

#setup data 
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ] 
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2)) 
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3)) 

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ] 
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB")) 
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE")) 

#create RDD 
userDf = sc.parallelize(userTime).toDF().alias("usertime") 
busDf = sc.parallelize(busTime).toDF().alias("bustime") 

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
    userDf.user, 
    userDf.time.alias("user_time"), 
    busDf.bus, 
    busDf.time.alias("bus_time"), 
    busDf.stop) 

additional_cols = joinedDF.withColumn("bus_time_diff", abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time")))) 

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff")).alias("rank")).filter(col("rank") == 1) 


additional_cols.show(20,False) 
partDf.show(20,False) 

輸出:

+----+---------------------+---+---------------------+----+-------------+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff| 
+----+---------------------+---+---------------------+----+-------------+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:40.0|sA |20   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:50.0|sB |10   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 11:59:40.0|sB |20   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 12:00:10.0|sE |10   | 
+----+---------------------+---+---------------------+----+-------------+ 
+----+---------------------+---+---------------------+----+-------------+----+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff|rank| 
+----+---------------------+---+---------------------+----+-------------+----+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   |1 | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   |1 | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   |1 | 
+----+---------------------+---+---------------------+----+-------------+----+ 
+0

這是非常不錯的,你要解決我的問題,非常感謝! – Finn

+0

不客氣! –