2017-03-06 51 views
1

我一直在試圖獲得org.apache.spark.sql.explode的動態版本,但沒有運氣:我有一個數據集,其日期列名爲event_date,另一列名爲no_of_days_gap。我想使用no_of_days_gap使用explode函數創建行的克隆。我最初的嘗試是使用這樣的:如何使用列值爆炸?

myDataset.withColumn("clone", explode(array((0 until col("no_of_days_gap")).map(lit): _*))) 

然而,col("no_of_days_gap")Column型和Int預期。我也嘗試了其他各種方法。那麼我怎麼才能做到這一點?

P.S .:我設法使用map功能,然後打電話flatMap,但我真的很想了解如何讓withColumn的方法工作。

回答

0

有關下列哪些?

scala> val diddy = Seq(
    | ("2017/03/07", 4), 
    | ("2016/12/09", 2)).toDF("event_date", "no_of_days_gap") 
diddy: org.apache.spark.sql.DataFrame = [event_date: string, no_of_days_gap: int] 

scala> diddy.flatMap(r => Seq.fill(r.getInt(1))(r.getString(0))).show 
+----------+ 
|  value| 
+----------+ 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2016/12/09| 
|2016/12/09| 
+----------+ 

// use explode instead 

scala> diddy.explode("no_of_days_gap", "events") { n: Int => 0 until n }.show 
warning: there was one deprecation warning; re-run with -deprecation for details 
+----------+--------------+------+ 
|event_date|no_of_days_gap|events| 
+----------+--------------+------+ 
|2017/03/07|    4|  0| 
|2017/03/07|    4|  1| 
|2017/03/07|    4|  2| 
|2017/03/07|    4|  3| 
|2016/12/09|    2|  0| 
|2016/12/09|    2|  1| 
+----------+--------------+------+ 

如果你堅持withColumn,那麼......是......它! 扣緊!

diddy 
    .withColumn("concat", concat($"event_date", lit(","))) 
    .withColumn("repeat", expr("repeat(concat, no_of_days_gap)")) 
    .withColumn("split", split($"repeat", ",")) 
    .withColumn("explode", explode($"split")) 
0

你必須使用UDF:

val range = udf((i: Integer) => (0 until i).toSeq) 

df 
    .withColumn("clone", range($"no_of_days_gap")) // Add range 
    .withColumn("clone", explode($"clone")) // Explode 
+0

在火花殼這將返回一個錯誤: '''階> VAL範圍= UDF((ⅰ:整數)=>(0直到ⅰ).toSeq) scala.MatchError:階。 collection.immutable.Range(類scala.reflect.internal.Types $ ClassNoArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:692) at org.apache.spark.sql .catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:671) at org.apache.spark.sql.functions $ .udf(functions.scala:3072) ... 48 elided''' – Diddy