3
我有List[Double]
,如何將其轉換爲org.apache.spark.sql.Column
。我試圖將其作爲列使用.withColumn()
插入到現有的DataFrame
中。如何將列表[Double]轉換爲列?
我有List[Double]
,如何將其轉換爲org.apache.spark.sql.Column
。我試圖將其作爲列使用.withColumn()
插入到現有的DataFrame
中。如何將列表[Double]轉換爲列?
它不能直接完成。 Column
不是數據結構,而是特定SQL表達式的表示。它不受特定數據的約束。你必須先轉換你的數據。接近這一點的一種方式是parallelize
和join
通過索引:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}
val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)
val rows = df.rdd.zipWithIndex.map(_.swap)
.join(sc.parallelize(aList).zipWithIndex.map(_.swap))
.values
.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))
另一個類似的方法是指標和使用,UDF來處理餘下的:
import scala.util.Try
val indexedDf = sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
},
df.schema.add("idx_", "long")
)
def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)
indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))
不幸的是這兩種解決方案會從問題的影響。首先通過驅動程序傳遞本地數據會在您的程序中引入嚴重的瓶頸。通常數據應該直接從執行者那裏訪問。另一個問題是如果你想迭代執行這個操作,就會增加RDD譜系。
雖然第二個問題可以通過檢查點來解決,但第一個問題通常會使這個想法毫無用處。我強烈建議你首先構建完整的結構,然後在Spark上讀取它,或者以可以利用Spark體系結構的方式重新構建管道。例如,如果數據來自外部源,則使用map
/mapPartitions
直接對每個數據塊執行讀取操作。
'List [Double]'中的Double元素是什麼? –
@JacekLaskowski,它只是一個數字列表(雙數據類型),我想添加爲現有數據框中的列。 – vdep
@vdep什麼是標題編輯?我不明白。 – eliasah