2017-10-19 67 views
0

如斯卡拉一個例子,我有一個列表,每相匹配我希望出現兩次(可能不適合這種使用情況下,最好的選擇條件的項目 - 但知道哪數):迭代通過在數據幀的行和變換一對多

l.flatMap { 
    case n if n % 2 == 0 => List(n, n) 
    case n => List(n) 
} 

我願做星火類似的東西 - 在一個數據幀遍歷行,如果行符合一定的條件,那麼我需要在副本中進行一些修改複製的行。如何才能做到這一點?

例如,如果我輸入如下表所示:

| name | age | 
|-------|-----| 
| Peter | 50 | 
| Paul | 60 | 
| Mary | 70 | 

我想通過表迭代和測試各行對多個條件,併爲每個匹配,一個條目應以創建條件匹配條件的名稱。

E.g.條件#1爲 「年齡> 60」 和條件#2是 「name.length < = 4」。這將導致下面的輸出:

| name | age |condition| 
|-------|-----|---------| 
| Paul | 60 | 2 | 
| Mary | 70 | 1 | 
| Mary | 70 | 2 | 
+0

你應該能夠做到這一點的'flatMap'爲好。你能顯示一些實際數據嗎? – Psidom

+0

加入讓它能夠更清晰 –

+0

你想放棄行,其中'name.length> 4',如果'年齡> 60'也'name.length> 4'?你還需要*條件*列嗎? – Psidom

回答

2

您可以filter匹配條件dataframes,最後union所有的人。

import org.apache.spark.sql.functions._ 
val condition1DF = df.filter($"age" > 60).withColumn("condition", lit(1)) 
val condition2DF = df.filter(length($"name") <= 4).withColumn("condition", lit(2)) 

val finalDF = condition1DF.union(condition2DF) 

你應該有你想要的輸出

+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Mary|70 |1  | 
|Paul|60 |2  | 
|Mary|70 |2  | 
+----+---+---------+ 

我希望答案是有幫助的

+0

謝謝,這種解決方案給了我最大的靈活性,並完美適合多種條件,從配置讀取條件,多個表等 –

+0

很高興聽到@EvanM。 :)感謝您的支持和接受 –

1

這裏是rdd.flatMap扁平化的一種方式:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 

val new_rdd = (df.rdd.flatMap(r => { 
    val conditions = Seq((1, r.getAs[Int](1) > 60), (2, r.getAs[String](0).length <= 4)) 
    conditions.collect{ case (i, c) if c => Row.fromSeq(r.toSeq :+ i) } 
})) 

val new_schema = StructType(df.schema :+ StructField("condition", IntegerType, true)) 

spark.createDataFrame(new_rdd, new_schema).show 
+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Paul| 60|  2| 
|Mary| 70|  1| 
|Mary| 70|  2| 
+----+---+---------+ 
+0

你爲什麼'df.rdd'使用'flatMap'? –

+0

@JacekLaskowski如果我不使用'rdd'我得到的錯誤。 *無法找到編碼器... *。 – Psidom

+0

@Psidom嘗試'進口spark.implicits._',有了它,你應該能夠使用'flatMap'對數據幀來代替。 'Spark'這裏是'SparkSession'。 – Shaido

1

您也可以使用UDF和explode()的組合,如下面的例子:

// set up example data 
case class Pers1 (name:String,age:Int) 
val d = Seq(Pers1("Peter",50), Pers1("Paul",60), Pers1("Mary",70)) 
val df = spark.createDataFrame(d) 

// conditions logic - complex as you'd like 
// probably should use a Set instead of Sequence but I digress.. 
val conditions:(String,Int)=>Seq[Int] = { (name,age) => 
    (if(age > 60) Seq(1) else Seq.empty) ++ 
    (if(name.length <=4) Seq(2) else Seq.empty) 
} 
// define UDF for spark 
import org.apache.spark.sql.functions.udf 
val conditionsUdf = udf(conditions) 
// explode() works just like flatmap 
val result = df.withColumn("condition", 
    explode(conditionsUdf(col("name"), col("age")))) 
result.show 

+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Paul| 60|  2| 
|Mary| 70|  1| 
|Mary| 70|  2| 
+----+---+---------+