我正試圖在火花中實現二次排序。準確地說,對於用戶會話的所有事件,我想根據時間戳對它們進行排序。我需要遍歷會話的每個事件來實現業務邏輯。我這樣做如下:如何在數據框上使用combineByKey
def createCombiner = (row: Row) => Array(row)
def mergeValue = (rows: Array[Row], row: Row) => {
rows :+ row
}
def mergeCombiner = (rows1: Array[Row], rows2: Array[Row]) => rows1 ++ rows2
def attribute(eventsList: List[Row]): List[Row] = {
for (row: Row <- eventsList) {
// some logic
}
}
var groupedAndSortedRows = rawData.rdd.map(row => {
(row.getAs[String]("session_id"), row)
}).combineByKey(createCombiner, mergeValue, mergeCombiner)
.mapValues(_.toList.sortBy(_.getAs[String]("client_ts")))
.mapValues(attribute)
但我擔心這是不是最長時間來做到這一點,當轉換到RDD將需要反序列化和系列化,我相信,隨着dataframes工作時,不需要有效的方法/數據集。
我不知道是否有一個聚合函數返回整個行
rawData.groupBy("session_id").someAggregateFunction()
我想someAggregateFunction()
返回的Rows
名單。我不想在某些列上聚合,但希望整個Rows
的列表對應於session_id
。是否有可能做到這一點?