groupByKey
洗牌數據和分組值的順序不再保證。它也相當昂貴。
如果你真的想在RDDs
沒有DataFrames
和排序操作的基礎上的日期,你可以使用aggregateByKey
:
import scala.math.Ordering
type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)
val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))
def minMax(x: (Record, Record), y: (Record, Record)) = {
(RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}
rdd.aggregateByKey((maxRecord, minRecord))(
(acc, x) => minMax(acc, (x, x)),
minMax
)
隨着DataFrames
你可以嘗試這樣的事:
import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window
val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")
val w = Window
.partitionBy(partition.head, partition.tail: _*)
.orderBy(order.head, order.tail: _*)
// Lead/lag of row number to mark first/last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)
// Select value if first/last row in the group otherwise null
val firstColumns = columns.map(
c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))
// Add columns with first/last vals
val expanded = df.select(
partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)
// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)
有還有其他一些方法可以通過DataFrames
解決此問題,包括通過structs
和DataSet
API進行排序。看到我的回答SPARK DataFrame: select the first row of each group
這不會提取第一個和最後一個。它只是提取一個恰好在groupByKey之後第一個或最後一個任意的行?這是你想要的嗎?如果不是,你想如何訂購這些值? – zero323
Cassandra在插入過程中按日期排序表格行。 –
但是'groupByKey'不能保證順序將在shuffle期間保留。 – zero323