2017-04-05 514 views
0

我有一個帶有6列的RDD,其中最後5列可能包含NaN。我的目的是用非Nan的最後5個值的其餘值的平均值替換NaN。例如,具有這種輸入:用Scala-Spark中的平均值填充Nan

1, 2, 3, 4, 5, 6 
2, 2, 2, NaN, 4, 0 
3, NaN, NaN, NaN, 6, 0 
4, NaN, NaN, 4, 4, 0 

輸出應該是:

1, 2, 3, 4, 5, 6 
2, 2, 2, 2, 4, 0 
3, 3, 3, 3, 6, 0 
4, 3, 3, 4, 4, 0 

我知道如何填補這些NaN的配列改造RDDDataFrame的平均值:

var aux1 = df.select(df.columns.map(c => mean(col(c))) :_*) 
var aux2 = df.na.fill(/*get values of aux1*/) 

我的問題是,你如何做這個操作,而不是用平均列填充NaN,用平均值填充一個子集行的p?

回答

2

你可以通過定義一個函數來獲取平均做到這一點,和其他功能連續補空。

由於DF您呈現:

val df = sc.parallelize(List((Some(1),Some(2),Some(3),Some(4),Some(5),Some(6)),(Some(2),Some(2),Some(2),None,Some(4),Some(0)),(Some(3),None,None,None,Some(6),Some(0)),(Some(4),None,None,Some(4),Some(4),Some(0)))).toDF("a","b","c","d","e","f") 

我們需要一個函數來獲取行的意思是:

import org.apache.spark.sql.Row 
def rowMean(row: Row): Int = { 
    val nonNulls = (0 until row.length).map(i => (!row.isNullAt(i), row.getAs[Int](i))).filter(_._1).map(_._2).toList 
    nonNulls.sum/nonNulls.length 
} 

,另一個在行填充空值:

def rowFillNulls(row: Row, fill: Int): Row = { 
    Row((0 until row.length).map(i => if (row.isNullAt(i)) fill else row.getAs[Int](i)) : _*) 
} 

現在,我們可以首先計算每一行平均:

val rowWithMean = df.map(row => (row,rowMean(row))) 

然後往裏面:

val result = sqlContext.createDataFrame(rowWithMean.map{case (row,mean) => rowFillNulls(row,mean)}, df.schema) 

之前和之後的最後查看...

df.show 
+---+----+----+----+---+---+ 
| a| b| c| d| e| f| 
+---+----+----+----+---+---+ 
| 1| 2| 3| 4| 5| 6| 
| 2| 2| 2|null| 4| 0| 
| 3|null|null|null| 6| 0| 
| 4|null|null| 4| 4| 0| 
+---+----+----+----+---+---+ 

result.show 
+---+---+---+---+---+---+ 
| a| b| c| d| e| f| 
+---+---+---+---+---+---+ 
| 1| 2| 3| 4| 5| 6| 
| 2| 2| 2| 2| 4| 0| 
| 3| 3| 3| 3| 6| 0| 
| 4| 3| 3| 4| 4| 0| 
+---+---+---+---+---+---+ 

這會爲任何寬度DF工作爲int的列。您可以輕鬆地此更新到其它數據類型,甚至非數字(提示,檢查DF模式!)

1

嗯,這是一個有趣的小問題 - 我會後我的解決辦法,但我一定會看,看是否有人想出了這樣做:)

首先一個更好的方式我會介紹一些udf S:

val avg = udf((values: Seq[Integer]) => { 
    val notNullValues = values.filter(_ != null).map(_.toInt) 
    notNullValues.sum/notNullValues.length 
}) 

val replaceNullWithAvg = udf((x: Integer, avg: Integer) => if(x == null) avg else x) 

,我會再申請到DataFrame這樣的:

dataframe 
    .withColumn("avg", avg(array(df.columns.tail.map(s => df.col(s)):_*))) 
    .select('col1, replaceNullWithAvg('col2, 'avg) as "col2", replaceNullWithAvg('col3, 'avg) as "col3", replaceNullWithAvg('col4, 'avg) as "col4", replaceNullWithAvg('col5, 'avg) as "col5", replaceNullWithAvg('col6, 'avg) as "col6") 

這將讓你什麼ÿ OU正在尋找,但無疑不是最複雜的代碼,我曾經放在一起......

+0

所以,我對我的回答改進,使得'平均'-udf能夠處理任意數量的列。我尊重你已經接受了另一個答案,但我想指出,我的解決方案不需要你在'rdds'和'dataframes'之間來回切換,而是直接在'dataframe'上運行:) –

1

一堆進口:

import org.apache.spark.sql.functions.{col, isnan, isnull, round, when} 
import org.apache.spark.sql.Column 

一些輔助函數:

def nullOrNan(c: Column) = isnan(c) || isnull(c) 

def rowMean(cols: Column*): Column = { 
    val sum = cols 
    .map(c => when(nullOrNan(c), lit(0.0)).otherwise(c)) 
    .fold(lit(0.0))(_ + _) 
    val count = cols 
    .map(c => when(nullOrNan(c), lit(0.0)).otherwise(lit(1.0))) 
    .fold(lit(0.0))(_ + _) 
    sum/count 
} 

A液:

val mean = round(
    rowMean(df.columns.tail.map(col): _*) 
).cast("int").alias("mean") 

val exprs = df.columns.tail.map(
    c => when(nullOrNan(col(c)), mean).otherwise(col(c)).alias(c) 
) 

val filled = df.select(col(df.columns(0)) +: exprs: _*)