我想在sparkSQLexamample runProgramaticSchemaExample上走得更遠,並且無法處理動態列數。請參閱此代碼,其中唯一更改是在for循環中指定Row的列映射。Spark:Programatic模式動態列映射
private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
// $example on:programmatic_schema$
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
// .map(attributes => Row(attributes(0), attributes(1).trim))
.map(attributes => Row(for (i <- 0 to (attributes.length -1)){attributes(i)}))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
peopleDF.printSchema()
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
// $example off:programmatic_schema$
}
}
以下是錯誤當我得到
16/11/15 09:31:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.runtime.BoxedUnit is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 1
:- null
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)