2016-03-30 103 views
-1

我首先說我是SQL新手,所以這個問題可能是微不足道的。 我有兩個帶時間戳鍵的表。 對於每個事件t_itable 1我希望所有的事件qtable 2這樣的:SQL簡單加入查詢

q.timeStamp < t_i.timeStamp and q.timeStamp > t_{i-1}.timeStamp 

也就是說,如果事件按照這個順序時間戳發生:

q1 
t1 q2 
    q3 
    q4 
t2 q5 
    q6 
t3 q7 

則結果查詢應該是:

t1: q1 
t2: q2 q3 q4 
t3: q5 q6 

我正在使用Scala與SQL Spark和DataSet a nd DataFrame類,所以無論是純函數式的「groupBy」還是SQL查詢都不錯。

+1

我們需要示例數據 - 向我們展示您的底層數據集的外觀(以我們可以將粘貼複製到自己的外殼的方式)。否則,我們不知道如何正確地轉換您的數據! –

+0

我沒有報告數據,因爲我將它包裝在多個案例類中以便於操作。所以我會粘貼原始數據 – mastro

+0

@KatyaHandler我剛剛添加了數據的快照。在原始數據集中,DATE字段也會發生變化,應該在查詢 – mastro

回答

1

首先,它不是一個真正的很「簡單」查詢......

首先 - 讓我們創建dataframes一些樣本數據 - 我創造了小case類,只有時間和一個字符串值,你可以替換它們更復雜的類:

case class A(time: Long, aValue: String) 
case class B(time: Long, bValue: String) 

val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7")) 
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3")) 

val dfA: DataFrame = sqlContext.createDataFrame(tableA) 
val dfB: DataFrame = sqlContext.createDataFrame(tableB) 

現在 - 兩個備選方案(其概念上相同):

  1. 使用SQL

    dfA.registerTempTable("a") 
    dfB.registerTempTable("b") 
    
    sqlContext.sql(
        """ 
        |SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue) 
        |FROM (
        | SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime 
        | FROM a 
        | JOIN b ON b.time > a.time 
        | GROUP BY a.time) AS c 
        |JOIN b ON c.bTime = b.time 
        |GROUP BY b.time 
        """.stripMargin).show() 
    

    這將打印,的B(時間和bValue)的時間列表和的值的列表中的每個值。

  2. 使用DataFrames

    import org.apache.spark.sql.functions._ 
    
    val aWithMinB: DataFrame = dfA 
        .join(dfB, dfA("time") < dfB("time")) 
        .groupBy(dfA("time")) 
        .agg(first(dfA("aValue")), min(dfB("time"))) 
        .withColumnRenamed("FIRST(aValue)", "aValue") 
        .withColumnRenamed("min(time)", "bTime") 
    
    aWithMinB 
        .join(dfB, dfB("time") === aWithMinB("bTime")) 
        .groupBy(dfB("time")) 
        .agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue"))) 
        .show() 
    

注意兩個只會星火1.6.0或更高版本的工作,因爲collect_list在早期版本中並不存在。

UPDATE:這裏的流量的一些解釋:

  • 的第一個查詢(在SQL版本內查詢)是指在表a應該建立一個「共同價值」的所有記錄將分組爲成結果中的單個記錄
  • 這是什麼常見的值?應分組的a中的值是b中兩個連續記錄之間的值。所以,他們共享最低b.time那就是大於那麼他們的時間。換句話說 - 對於a中的每次X,我們尋找大於X的b中最小的最小時間。這將是相同的值a所有記錄之間的兩個連續b小號
  • 爲了實現這個目標,我們同abb.time > a.time(獲得的b多條記錄爲a每個記錄),然後條件通過a.time(收縮的結果返回到a每條記錄​​一個記錄),取最低b.time爲每個這樣的記錄,每一列a第一值(以第一組並不重要 - 所有分組的記錄對於所有具有相同的值)
  • 現在我們對a中的每個記錄都有這個「額外信息」,我們將其與b聯繫起來,並在time列中將其與該列分組。所有abTime相同的記錄將被加入到相應的b記錄中,我們完成了:我們再次對b的所有列使用first(同樣,對於所有分組記錄,所有值都相同,因爲我們對b進行了分組, s的唯一標識符),並在a的列上使用collect_list將所有值作爲列表獲取。
+0

中考慮這個問題,但背後的原因尚不清楚,因爲我之前沒有SQL經驗,您能向我解釋嗎? – mastro

+0

增加了詳細的解釋,希望這有助於。如果沒有,對不起:(拿一個SQL教程來理解加入的邏輯意義,在哪裏,分組...... –