2017-09-27 48 views
0

聚集和旋轉,我想實現旋轉火花星火斯卡拉 - 基於時間段

截至目前,我使用sqlContext和SQL應用內全部改造類似於SQL Server。 我想知道我是否可以直接從sql server中獲取數據並使用spark實現數據透視功能。

下面是什麼,我想achieve- 的SQL Server查詢如下─

create table #temp(ID Int, MonthPrior int, Amount float);

insert into #temp values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);

select * from #temp;

的例子| ID | MonthPrior | 金額 |
| ------- | ---------- | ------ |
| 100 | 1 | 10 |
| 100 | 2 | 20 |
| 100 | 3 | 30 |
| 100 | 4 | 10 |
| 100 | 5 | 20 |
| 100 | 6 | 60 |
| 200 | 1 | 10 |
| 200 | 2 | 20 |
| 200 | 3 | 30 |
| 300 | 4 | 10 |
| 300 | 5 | 20 |
| 300 | 6 | 60 |

Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A pivot (sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6])) as Pvt

| ID | 金額1月 | Amount1to3thth | Amount_AllMonths |
| ------- | ------- | ------- | --- |
| 100 | 10 | 60 | 150 |
| 200 | 10 | 60 | 60 |
| 300 | 0 | 0 | 90 |

回答

1

如果您Amount列的Decimal類型,最好使用java.math.BigDecimal作爲相應的參數類型。請注意,方法+sum不再適用,因此分別替換爲addreduce

import org.apache.spark.sql.functions._ 
import java.math.BigDecimal 

val df = Seq(
    (100, 1, new BigDecimal(10)), 
    (100, 2, new BigDecimal(20)), 
    (100, 3, new BigDecimal(30)), 
    (100, 4, new BigDecimal(10)), 
    (100, 5, new BigDecimal(20)), 
    (100, 6, new BigDecimal(60)), 
    (200, 1, new BigDecimal(10)), 
    (200, 2, new BigDecimal(20)), 
    (200, 3, new BigDecimal(30)), 
    (300, 4, new BigDecimal(10)), 
    (300, 5, new BigDecimal(20)), 
    (300, 6, new BigDecimal(60)) 
).toDF("ID", "MonthPrior", "Amount") 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap 
) 

// Create array columns which get zipped into a map 
val df2 = df.groupBy("ID").agg(
    collect_list(col("MonthPrior")).as("MonthList"), 
    collect_list(col("Amount")).as("AmountList") 
).select(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 
) 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, BigDecimal], n: Int) => 
    if (n > 0) 
     m.collect{ case (k, v) => if (k <= n) v else new BigDecimal(0) }.reduce(_ add _) 
    else 
     m.collect{ case (k, v) => v }.reduce(_ add _) 
) 

val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

df3.show(truncate=false) 
+---+--------------------+--------------------+--------------------+ 
| ID|   Amount1Mth|  Amount1to3Mth| Amount_AllMonths| 
+---+--------------------+--------------------+--------------------+ 
|300|    0E-18|    0E-18|90.00000000000000...| 
|100|10.00000000000000...|60.00000000000000...|150.0000000000000...| 
|200|10.00000000000000...|60.00000000000000...|60.00000000000000...| 
+---+--------------------+--------------------+--------------------+ 
1

一種方法是創建從MonthPriorAmount陣列的地圖類型列,並應用UDF,基於整數參數總和地圖值:

val df = Seq(
    (100, 1, 10), 
    (100, 2, 20), 
    (100, 3, 30), 
    (100, 4, 10), 
    (100, 5, 20), 
    (100, 6, 60), 
    (200, 1, 10), 
    (200, 2, 20), 
    (200, 3, 30), 
    (300, 4, 10), 
    (300, 5, 20), 
    (300, 6, 60) 
).toDF("ID", "MonthPrior", "Amount") 

import org.apache.spark.sql.functions._ 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap 
) 

// Aggregate columns into arrays and apply arrayToMap UDF to create map column 
val df2 = df.groupBy("ID").agg(
    collect_list(col("MonthPrior")).as("MonthList"), 
    collect_list(col("Amount")).as("AmountList") 
).select(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 
) 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, Int], n: Int) => 
    if (n > 0) m.collect{ case (k, v) => if (k <= n) v else 0 }.sum else 
     m.collect{ case (k, v) => v }.sum 
) 

// Apply sumMapValues UDF to the map column 
val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

df3.show 
+---+----------+-------------+----------------+ 
| ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths| 
+---+----------+-------------+----------------+ 
|300|   0|   0|    90| 
|100|  10|   60|    150| 
|200|  10|   60|    60| 
+---+----------+-------------+----------------+ 
+0

謝謝@LeoC 我會分析一下這個方法。似乎工作 –

+0

很高興它有幫助。如果解決了發佈的問題,請通過接受答案來解決問題。 –

+0

(a:Seq [Int],b:Seq [Int])=>(a zip b).toMap提供了問題。我正在研究這個問題 由於數據類型不匹配,無法解析'UDF(col_1,col_2)':參數2需要數組類型,但'col_2'是數組類型。 ' 嘗試在udf中使用數字/小數。仍然沒有工作 –

0

Thanks @LeoC。以上解決方案工作我也試過以下幾種 -

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Column 


lazy val months = (((df select ($"MonthPrior") distinct) sort 
($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList 

lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6")) 

lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) => 
    sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName 

lazy val grouper = createGroup(months).tupled 

lazy val groupedCols = sliceSpec map (group => grouper(group)) 

val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount")) 

val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*) 

z.show(writeMe sort ($"ID".asc))