首先,失敗的原因可以清楚地在createDataFrame
簽名看出:
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
類型A
是有界是scala.Product
的子類。您的RDD包含Array[String]
,這不是這樣的子類。如果你真的想,你可以人爲地包裹在Tuple1
陣列(延伸Product
),並得到這個工作:
val csvrdd: RDD[Tuple1[Array[String]]] = df
.select(df("Body").cast("string"))
.rdd
.map{ x:Row => x.getAs[String](0)}
.map(x => Tuple1(x.split(","))) // wrapping with a Tuple1, which extends scala.Product
val dfWithoutSchema = spark.createDataFrame(csvrdd) // now this overload works
dfWithoutSchema.printSchema()
// root
// |-- _1: array (nullable = true)
// | |-- element: string (containsNull = true)
然而 - 這似乎並沒有太大的用處。這將創建一個類型爲ArrayType
的單列DataFrame。這可以用更簡單split
功能從org.apache.spark.sql.functions
來實現:
val withArray = df.select(split(df("Body").cast("string"), ",") as "arr")
withArray.printSchema()
// root
// |-- arr: array (nullable = true)
// | |-- element: string (containsNull = true)
或者,如果你希望獲得什麼是單獨列每個「CSV列」的數據幀,你會必須在所有記錄的共同模式上「決定」(並非所有記錄都具有相同數量的「CSV部分」)。你可以通過添加數據框的另一次掃描並計算最大數所需的列,然後讓有null
星火「填補空白」,其中實際值包含較少的部件:
// first - split String into array of Strings
val withArray = df.select(split(df("Body").cast("string"), ",") as "arr")
// optional - calculate the *maximum* number of columns;
// If you know it to begin with (e.g. "CSV cannot exceed X columns") -
// you can skip this and use that known value
val maxLength: Int = withArray.select(size($"arr") as "size")
.groupBy().max("size")
.first().getAs[Int](0)
// Create the individual columns, with nulls where the arrays were shorted than maxLength
val columns = (0 until maxLength).map(i => $"arr".getItem(i) as s"col$i")
// select these columns
val result = withArray.select(columns: _*)
result.printSchema() // in my example, maxLength = 4
// root
// |-- col0: string (nullable = true)
// |-- col1: string (nullable = true)
// |-- col2: string (nullable = true)
// |-- col3: string (nullable = true)
比方說, 「Body」的所有值都是帶有4列的CSV(例如'「a,b,c,d」') - 那麼您會期望生成的DataFrame具有4個String列還是一個Array列? –