2016-11-15 61 views
0

我想在sparkSQLexamample runProgramaticSchemaExample上走得更遠,並且無法處理動態列數。請參閱此代碼,其中唯一更改是在for循環中指定Row的列映射。Spark:Programatic模式動態列映射

private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { 
    import spark.implicits._ 
    // $example on:programmatic_schema$ 
    // Create an RDD 
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") 

    // The schema is encoded in a string 
    val schemaString = "name age" 

    // Generate the schema based on the string of schema 
    val fields = schemaString.split(" ") 
     .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
    val schema = StructType(fields) 

    // Convert records of the RDD (people) to Rows 
    val rowRDD = peopleRDD 
     .map(_.split(",")) 
    //  .map(attributes => Row(attributes(0), attributes(1).trim)) 
     .map(attributes => Row(for (i <- 0 to (attributes.length -1)){attributes(i)})) 

    // Apply the schema to the RDD 
    val peopleDF = spark.createDataFrame(rowRDD, schema) 

    // Creates a temporary view using the DataFrame 
    peopleDF.createOrReplaceTempView("people") 
    peopleDF.printSchema() 
    // SQL can be run over a temporary view created using DataFrames 
    val results = spark.sql("SELECT name FROM people") 

    // The results of SQL queries are DataFrames and support all the normal RDD operations 
    // The columns of a row in the result can be accessed by field index or by field name 
    results.map(attributes => "Name: " + attributes(0)).show() 
    // +-------------+ 
    // |  value| 
    // +-------------+ 
    // |Name: Michael| 
    // | Name: Andy| 
    // | Name: Justin| 
    // +-------------+ 
    // $example off:programmatic_schema$ 
    } 
} 

以下是錯誤當我得到

16/11/15 09:31:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.runtime.BoxedUnit is not a valid external type for schema of string 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 

回答

0

我有完全相同的問題,因爲你並試圖解決它以同樣的方式開始產生同樣的錯誤:)我敢新的scala,但設法想出一個函數來生成該行對象(只需要傳遞你的字段計數)。

獲取場數:

val fieldCount = rdd.map(_.split("\u0001")).map(x => x.size).first() 

生成行對象功能:

def getARow(x : Array[String], size : Int) : Row={ 
val columnArray = new Array[String](size+1) 
for (i <- 0 to (size)) { 
    columnArray(i)=x(i).toString() 
} 
Row.fromSeq(columnArray) 
} 

使用RDD和模式

val myDF = sqlContext.createDataFrame(rdd.map(_.split(delim)).map { x => getARow(x,fieldCount) }, mySchema) 

希望創建您的數據幀這可以幫助別人!

1

丹尼爾·的答案是好的,但我覺得有點問題,我修改了它和它的作品:

val fieldCount = schemaString.split(" ").length 
def getARow(x : Array[String], size : Int) : Row={ 
    val columnArray = new Array[String](size) 
    for (i <- 0 to (size-1)) { 
    columnArray(i)=x(i).toString() 
    } 
    Row.fromSeq(columnArray) 
} 

val fields = schemaString.split(" ") 
    .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields) 
val rowRDD = peopleRDD 
    .map(_.split(",")) 
    .map(attributes => getARow(attributes,fieldCount)) 
val peopleDF = spark.createDataFrame(rowRDD, schema)