2017-10-09 73 views
1

我正試圖在火花中實現二次排序。準確地說,對於用戶會話的所有事件,我想根據時間戳對它們進行排序。我需要遍歷會話的每個事件來實現業務邏輯。我這樣做如下:如何在數據框上使用combineByKey

def createCombiner = (row: Row) => Array(row) 

def mergeValue = (rows: Array[Row], row: Row) => { 
    rows :+ row 
} 

def mergeCombiner = (rows1: Array[Row], rows2: Array[Row]) => rows1 ++ rows2 

def attribute(eventsList: List[Row]): List[Row] = { 
    for (row: Row <- eventsList) { 
    // some logic 
    } 
} 

var groupedAndSortedRows = rawData.rdd.map(row => { 
    (row.getAs[String]("session_id"), row) 
}).combineByKey(createCombiner, mergeValue, mergeCombiner) 
    .mapValues(_.toList.sortBy(_.getAs[String]("client_ts"))) 
    .mapValues(attribute) 

但我擔心這是不是最長時間來做到這一點,當轉換到RDD將需要反序列化和系列化,我相信,隨着dataframes工作時,不需要有效的方法/數據集。

我不知道是否有一個聚合函數返回整個行

rawData.groupBy("session_id").someAggregateFunction() 

我想someAggregateFunction()返回的Rows名單。我不想在某些列上聚合,但希望整個Rows的列表對應於session_id。是否有可能做到這一點?

回答

0

答案是肯定的,但可能不是你所期望的。取決於你的業務邏輯的複雜程度,還有比combineByKey其他2個alernatives

  1. 如果你只需要平均值,最小值,最大值和其他已知的功能[spark.sql.functions] [1]

    定義

    [1]:https://github.com/apache/spark/blob/v2.0.2/sql/core/src/main/scala/org/apache/spark/sql/functions.scala你當然可以用groupBy(...)。agg(...)。我想那不是你的情況。所以,如果你想實現自己的UDAF這是沒有比combineByKey更好,除非這個商業邏輯是很常見的,可以重新用於其他數據集

  2. ,或者您需要稍微複雜的邏輯,你可以使用窗口函數 要使用Window.partitionBy($「session_id」)。orderBy($「client_ts」desc)來指定窗口規格,那麼您可以輕鬆實現topN,移動平均,ntile等。見https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html,您也可以自己實現自定義窗口合併函數

相關問題