3

通常,組中的所有行都被傳遞給聚合函數。我想使用一個條件來過濾行,以便只有組中的某些行被傳遞給一個聚合函數。這種操作可能與PostgreSQL。我想用Spark SQL DataFrame(Spark 2.0.0)做同樣的事情。如何使用spark sql篩選特定聚合的行?

代碼可能看起來是這樣的:

val df = ... // some data frame 
df.groupBy("A").agg(
    max("B").where("B").less(10), // there is no such method as `where` :(
    max("C").where("C").less(5) 
) 

因此,對於這樣一個數據幀:

| A | B | C | 
| 1| 14| 4| 
| 1| 9| 3| 
| 2| 5| 6| 

其結果將是:

|A|max(B)|max(C)| 
|1| 9|  4| 
|2| 5| null| 

是否有可能與Spark SQL?

請注意,通常可以使用除max以外的任何其他聚合函數,並且可以在具有任意過濾條件的同一列上使用多個聚合。

+0

我'首先用null或NaN替換超出限制的所有值,然後我將groupBy和聚合。 –

+0

這適用於這種特殊情況,但如果在具有不同過濾條件的同一列上有多個聚合,它將不起作用。 –

回答

0
>>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF() 
    >>> df.registerTempTable('t') 
    >>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a") 

    +---+---+----+ 
    | a| mb| mc| 
    +---+---+----+ 
    | 1| 9| 3| 
    | 2| 5|null| 
    +---+---+----+ 

可以使用SQL(我相信你在Postgres的做同樣的事情?)

0
df.groupBy("name","age","id").agg(functions.max("age").$less(20),functions.max("id").$less("30")).show(); 

樣本數據:

name age id 
abc  23 1001 
cde  24 1002 
efg  22 1003 
ghi  21 1004 
ijk  20 1005 
klm  19 1006 
mno  18 1007 
pqr  18 1008 
rst  26 1009 
tuv  27 1010 
pqr  18 1012 
rst  28 1013 
tuv  29 1011 
abc  24 1015 

輸出:

+----+---+----+---------------+--------------+ 
|name|age| id|(max(age) < 20)|(max(id) < 30)| 
+----+---+----+---------------+--------------+ 
| rst| 26|1009|   false|   true| 
| abc| 23|1001|   false|   true| 
| ijk| 20|1005|   false|   true| 
| tuv| 29|1011|   false|   true| 
| efg| 22|1003|   false|   true| 
| mno| 18|1007|   true|   true| 
| tuv| 27|1010|   false|   true| 
| klm| 19|1006|   true|   true| 
| cde| 24|1002|   false|   true| 
| pqr| 18|1008|   true|   true| 
| abc| 24|1015|   false|   true| 
| ghi| 21|1004|   false|   true| 
| rst| 28|1013|   false|   true| 
| pqr| 18|1012|   true|   true| 
+----+---+----+---------------+--------------+ 
+0

這實際上並沒有回答原來的問題。這只是在聚合之後提供額外的運算符,而不是之前的過濾。 –

4
val df = Seq((1,14,4),(1,9,3),(2,5,6)).toDF("a","b","c") 

val agg = df.groupBy("a").agg(max(when($"b" < 10, $"b")).as("MaxB"), max(when($"c" < 5, $"c")).as("MaxC")) 

agg.show 
+1

如果你解釋你在這裏做什麼,這將是很好的 – MZaragoza