2016-04-03 145 views
3

我有List[Double],如何將其轉換爲org.apache.spark.sql.Column。我試圖將其作爲列使用.withColumn()插入到現有的DataFrame中。如何將列表[Double]轉換爲列?

+0

'List [Double]'中的Double元素是什麼? –

+0

@JacekLaskowski,它只是一個數字列表(雙數據類型),我想添加爲現有數據框中的列。 – vdep

+0

@vdep什麼是標題編輯?我不明白。 – eliasah

回答

8

它不能直接完成。 Column不是數據結構,而是特定SQL表達式的表示。它不受特定數據的約束。你必須先轉換你的數據。接近這一點的一種方式是parallelizejoin通過索引:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, DoubleType} 

val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y") 
val aList = List(1.0, -1.0, 0.0) 

val rows = df.rdd.zipWithIndex.map(_.swap) 
    .join(sc.parallelize(aList).zipWithIndex.map(_.swap)) 
    .values 
    .map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) } 

sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false)) 

另一個類似的方法是指標和使用,UDF來處理餘下的:

import scala.util.Try 

val indexedDf = sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map { 
    case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i) 
    }, 
    df.schema.add("idx_", "long") 
) 

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption) 

indexedDf.withColumn("z", addValue(aList.toVector)($"idx_")) 

不幸的是這兩種解決方案會從問題的影響。首先通過驅動程序傳遞本地數據會在您的程序中引入嚴重的瓶頸。通常數據應該直接從執行者那裏訪問。另一個問題是如果你想迭代執行這個操作,就會增加RDD譜系。

雖然第二個問題可以通過檢查點來解決,但第一個問題通常會使這個想法毫無用處。我強烈建議你首先構建完整的結構,然後在Spark上讀取它,或者以可以利用Spark體系結構的方式重新構建管道。例如,如果數據來自外部源,則使用map/mapPartitions直接對每個數據塊執行讀取操作。