首先,它不是一個真正的很「簡單」查詢......
首先 - 讓我們創建dataframes一些樣本數據 - 我創造了小case類,只有時間和一個字符串值,你可以替換它們更復雜的類:
case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)
val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))
val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)
現在 - 兩個備選方案(其概念上相同):
使用SQL:
dfA.registerTempTable("a")
dfB.registerTempTable("b")
sqlContext.sql(
"""
|SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue)
|FROM (
| SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime
| FROM a
| JOIN b ON b.time > a.time
| GROUP BY a.time) AS c
|JOIN b ON c.bTime = b.time
|GROUP BY b.time
""".stripMargin).show()
這將打印,的B(時間和bValue)的時間列表和的值的列表中的每個值。
使用DataFrames:
import org.apache.spark.sql.functions._
val aWithMinB: DataFrame = dfA
.join(dfB, dfA("time") < dfB("time"))
.groupBy(dfA("time"))
.agg(first(dfA("aValue")), min(dfB("time")))
.withColumnRenamed("FIRST(aValue)", "aValue")
.withColumnRenamed("min(time)", "bTime")
aWithMinB
.join(dfB, dfB("time") === aWithMinB("bTime"))
.groupBy(dfB("time"))
.agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue")))
.show()
注意兩個只會星火1.6.0或更高版本的工作,因爲collect_list
在早期版本中並不存在。
UPDATE:這裏的流量的一些解釋:
- 的第一個查詢(在SQL版本內查詢)是指在表
a
應該建立一個「共同價值」的所有記錄將分組爲成結果中的單個記錄
- 這是什麼常見的值?應分組的
a
中的值是b
中兩個連續記錄之間的值。所以,他們共享最低值b.time
那就是大於那麼他們的時間。換句話說 - 對於a
中的每次X,我們尋找大於X的b
中最小的最小時間。這將是相同的值在a
所有記錄之間的兩個連續b
小號
- 爲了實現這個目標,我們同
a
與b
與b.time > a.time
(獲得的b
多條記錄爲a
每個記錄),然後條件通過a.time
(收縮的結果返回到a
每條記錄一個記錄),取最低b.time
爲每個這樣的記錄,每一列a
的第一值(以第一組並不重要 - 所有分組的記錄對於所有具有相同的值)
- 現在我們對
a
中的每個記錄都有這個「額外信息」,我們將其與b
聯繫起來,並在time
列中將其與該列分組。所有a
與bTime
相同的記錄將被加入到相應的b
記錄中,我們完成了:我們再次對b
的所有列使用first
(同樣,對於所有分組記錄,所有值都相同,因爲我們對b
進行了分組, s的唯一標識符),並在a
的列上使用collect_list
將所有值作爲列表獲取。
我們需要示例數據 - 向我們展示您的底層數據集的外觀(以我們可以將粘貼複製到自己的外殼的方式)。否則,我們不知道如何正確地轉換您的數據! –
我沒有報告數據,因爲我將它包裝在多個案例類中以便於操作。所以我會粘貼原始數據 – mastro
@KatyaHandler我剛剛添加了數據的快照。在原始數據集中,DATE字段也會發生變化,應該在查詢 – mastro