2017-10-08 37 views
0

我有這個簡單的數據幀,看起來像這樣,如何使用spark數據框列的字面值?

+---+---+---+---+ 
|nm | ca| cb| cc| 
+---+---+---+---+ 
| a|123| 0| 0| 
| b| 1| 2| 3| 
| c| 0| 1| 0| 
+---+---+---+---+ 

我想要做的是,

+---+---+---+---+---+ 
|nm |ca |cb |cc |p | 
+---+---+---+---+---+ 
|a |123|0 |0 |1 | 
|b |1 |2 |3 |1 | 
|c |0 |1 |0 |0 | 
+---+---+---+---+---+ 

bascially增加了一個新的列p,例如,如果nm列的值是'一個」,檢查列ca> 0,如果是把 '1' 塔P1,否則爲0

我的代碼,

 def purchaseCol: UserDefinedFunction = 
    udf((brand: String) => s"c$brand") 

val a = ss.createDataset(List(
     ("a", 123, 0, 0), 
     ("b", 1, 2, 3), 
     ("c", 0, 1, 0))) 
    .toDF("nm", "ca", "cb", "cc") 

a.show() 
a.withColumn("p", when(lit(DataFrameUtils.purchaseCol($"nm")) > 0, 1).otherwise(0)) 
.show(false) 

它似乎沒有工作,並且正在爲'p'中的所有行返回0。

PS:列數超過100,它們是動態生成的。

回答

1

地圖上rdd,計算並添加p每一行:

val a = sc.parallelize(
    List(("a", 123, 0, 0), 
     ("b", 1, 2, 3), 
     ("c", 0, 1, 0)) 
).toDF("nm", "ca", "cb", "cc") 

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 

val b = a.rdd.map(r => { 
    val s = r.getAs[String]("nm") 
    val v = r.getAs[Int](s"c$s") 
    val p = if(v > 0) 1 else 0 
    Row.fromSeq(r.toSeq :+ p) 
}) 

val new_schema = StructType(a.schema :+ StructField("p", IntegerType, true)) 

val df_new = spark.createDataFrame(b, new_schema) 

df_new.show 
+---+---+---+---+---+ 
| nm| ca| cb| cc| p| 
+---+---+---+---+---+ 
| a|123| 0| 0| 1| 
| b| 1| 2| 3| 1| 
| c| 0| 1| 0| 0| 
+---+---+---+---+---+ 
+0

你這是我所做的大部分,並不是真的想使用rdds和重建數據框。正在尋找更優雅的解決方案,但是,wr,+1 –

0

看你的邏輯

如果列處的值是 'A',檢查列CA是> 0,如果是,則對於列p1否則將'1'置爲0.

,你可以簡單地做

import org.apache.spark.sql.functions._ 
a.withColumn("p", when((col("nm") === lit("a")) && (col("ca") > 0), lit(1)).otherwise(lit(0))) 

但看着你的輸出dataframe,您將需要一個||代替&&

import org.apache.spark.sql.functions._ 
a.withColumn("p", when((col("nm") === lit("a")) || (col("ca") > 0), lit(1)).otherwise(lit(0))) 
+0

的挑戰是做它的程序,a是ca的一部分,b是cb等的一部分。 –

0

如果「C *」列數量是有限的,UDF的所有值可使用:

val nameMatcherFunct = (nm: String, ca: Int, cb: Int, cc: Int) => { 
    val value = nm match { 
    case "a" => ca 
    case "b" => cb 
    case "c" => cc 
    } 
    if (value > 0) 1 else 0 
} 

def purchaseValueUDF = udf(nameMatcherFunct) 

val result = a.withColumn("p", purchaseValueUDF(col("nm"), col("ca"), col("cb"), col("cc"))) 

如果您有許多「c *」列,可以使用Row作爲參數的函數: How to pass whole Row to UDF - Spark DataFrame filter

+0

你是那個問題,列是動態的 –

-1
val a1 = sc.parallelize(
    List(("a", 123, 0, 0), 
     ("b", 1, 2, 3), 
     ("c", 0, 1, 0)) 
).toDF("nm", "ca", "cb", "cc") 

a1.show() 


+---+---+---+---+ 
| nm| ca| cb| cc| 
+---+---+---+---+ 
| a|123| 0| 0| 
| b| 1| 2| 3| 
| c| 0| 1| 0| 
+---+---+---+---+ 


val newDf = a1.withColumn("P", when($"ca" > 0, 1).otherwise(0)) 
newDf.show() 

+---+---+---+---+---+ 
| nm| ca| cb| cc| P| 
+---+---+---+---+---+ 
| a|123| 0| 0| 1| 
| b| 1| 2| 3| 1| 
| c| 0| 1| 0| 0| 
+---+---+---+---+---+ 
+0

noe,this是錯的,你必須考慮每行的不同列。假設如果在另一行「d,1,4,5,0」之後有另一個列cd,則與此相對應的P的值將爲0,但是您的llogic會將其標記爲1 –

相關問題