2016-02-12 111 views
2

我有這段代碼從Cassandra獲取RDD,然後爲每個鍵提取第一行和最後一行並將它們相減。在Spark中從Cassandra中扣除第一行和最後一行的值

val rdd = sc.cassandraTable("keyspace","table") 
    .select("column1", "column2", "column3", "column4","column5") 
    .as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e))) 
    .groupByKey.mapValues(v => v.toList) 
    .cache 

val finalValues = rdd.mapValues(v => v.head) 
val initialValues = rdd.mapValues(v => v.last) 
val valuesCombined = finalValues.join(initialValues) 

val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1)) 

性能好還是有更好的解決方案?我不確定將整個數據集緩存在內存中。

+0

這不會提取第一個和最後一個。它只是提取一個恰好在groupByKey之後第一個或最後一個任意的行?這是你想要的嗎?如果不是,你想如何訂購這些值? – zero323

+0

Cassandra在插入過程中按日期排序表格行。 –

+0

但是'groupByKey'不能保證順序將在shuffle期間保留。 – zero323

回答

2

groupByKey洗牌數據和分組值的順序不再保證。它也相當昂貴。

如果你真的想在RDDs沒有DataFrames和排序操作的基礎上的日期,你可以使用aggregateByKey

import scala.math.Ordering 

type Record = (String, String, String, Double, java.util.Date) 
val RecordOrd = Ordering.by[Record, java.util.Date](_._5) 

val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue)) 
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue)) 

def minMax(x: (Record, Record), y: (Record, Record)) = { 
    (RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2)) 
} 

rdd.aggregateByKey((maxRecord, minRecord))(
    (acc, x) => minMax(acc, (x, x)), 
    minMax 
) 

隨着DataFrames你可以嘗試這樣的事:

import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max} 
import org.apache.spark.sql.expressions.Window 

val partition = Seq("column1") 
val order = Seq("column5") 
val columns = Seq("column2", "column3", "column4","column5") 

val w = Window 
    .partitionBy(partition.head, partition.tail: _*) 
    .orderBy(order.head, order.tail: _*) 

// Lead/lag of row number to mark first/last row in the group 
val rn_lag = lag(row_number.over(w), 1).over(w) 
val rn_lead = lead(row_number.over(w), 1).over(w) 

// Select value if first/last row in the group otherwise null 
val firstColumns = columns.map(
    c => when(rn_lag.isNull, col(c)).alias(s"${c}_first")) 
val lastColumns = columns.map(
    c => when(rn_lead.isNull, col(c)).alias(s"${c}_last")) 

// Add columns with first/last vals 
val expanded = df.select(
    partition.map(col(_)) ++ firstColumns ++ lastColumns: _*) 

// Aggregate to drop nulls 
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c)) 
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*) 

有還有其他一些方法可以通過DataFrames解決此問題,包括通過structsDataSet API進行排序。看到我的回答SPARK DataFrame: select the first row of each group

+0

感謝您的輸入。我已經完成了Spark上的Datastax教程,它基於RDD,並且最終我只是提到了DataFrames,我是否認爲RDD是最佳選擇。現在看完這個[link](http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/)後,我知道DataFrames在性能方面更好。我會嘗試使用DataFrame編寫代碼。如果您有時間,我將非常感謝您使用DataFrames編寫它。 –

+0

@PawełSzychiewicz我把另一個答案和一些例子聯繫起來,你如何解決組中第一個(最後一個)行選擇可能比窗口函數更直觀的問題。 – zero323

1

首先 - 我假設all變量是指名爲rdd?創建後,您不需要使用連接(這是昂貴的性能代價),你可以簡單地每個元素直接映射到結果,你需要:

val results = all.mapValues(v => v.head - v.last).values 

現在 - 因爲我們只進行了在RDD上單獨動作,我們也可以擺脫cache()

+0

這不起作用。 'head'和'last'可以是任意元素,特別是因爲'groupByKey'禁用了地圖邊聚合。如果你想'mapValues',你應該先執行訂單。 – zero323

相關問題