2017-02-25 50 views
1

Spark 2.1(獨立)後,裝配的值不能爲空。無法在添加VectorAssembler列後將火花數據幀保存到實木複合地板文件。 沒有問題,以節省矢量列前數據幀,所有的 「功能」 是不爲空(NVL使用)SparkException:在Windows上添加矢量列

val conf = new SparkConf().setAppName("RandomForestModelML").setMaster("local") 
    val sparkSession = SparkSession.builder().config(conf).getOrCreate() 

    val df = sparkSession.read 
    .option("header", true) 
    .option("delimiter", ";") 
    .csv("C:\\tmp\\file2.csv") 
    .createOrReplaceTempView("features") 

    var data = sparkSession.sql("select cast(NVL(c9003,0) as int) as c9003.. from features") 

    data = data.withColumnRenamed("target", "label") 
    val ignored = List("label", "c0025", "c9052", "c0019") 

    val featureAssembler = new VectorAssembler() 
    .setInputCols(data.columns.diff(ignored)) 
    .setOutputCol("features") 
    data = featureAssembler.transform(data) 
    data.printSchema() 
    data.show(5) 

    data.write.format("parquet").save("C:\\tmp\\features.parquet") 

爲printSchema和show(5)輸出:

root 
|-- c9003: integer (nullable = true) 
|-- c0022: integer (nullable = true) 
|-- c9014: integer (nullable = true) 
|-- c9008: integer (nullable = true) 
|-- a8401: integer (nullable = true) 
|-- c0021: double (nullable = true) 
|-- c0025: string (nullable = true) 
|-- d1417: integer (nullable = true) 
|-- d0006: integer (nullable = true) 
|-- c9052: string (nullable = true) 
|-- c0023: integer (nullable = true) 
|-- d1501: integer (nullable = true) 
|-- c0020: integer (nullable = true) 
|-- d0007: integer (nullable = true) 
|-- c0024: integer (nullable = true) 
|-- c4018: integer (nullable = true) 
|-- at180: integer (nullable = true) 
|-- c1421: integer (nullable = true) 
|-- label: integer (nullable = true) 
|-- features: vector (nullable = true) 


+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
|c9003|c0022|c9014|c9008|a8401|c0021|    c0025|d1417|d0006|c9052|c0023|d1501|c0020|d0007|c0024|c4018|at180|c1421|label|   features| 
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
| 10| 1| 4| 53| 0| 0.0|FHB KERESKEDELMI ...| 0| 0| P| 2| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,8,10...| 
| 10| 1| 3| 69| 0| 0.01|  MKB BANK ZRT| 1| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,6,...| 
| 100| 2| 4| 57| 0| 0.24|SANTANDER CONSUME...| 1| 18| P| 2| 1| 1| 0| 0| 0| 0| 1| 0|[100.0,2.0,4.0,57...| 
| 100| 2| 5| 55| 0| 0.09|SANTANDER CONSUME...| 0| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,10...| 
| 10| 3| 2| 65| 4| 0.23|RAIFFEISEN BANK ZRT.| 2| 14| P| 0| 2| 1| 0| 0| 0| 0| 1| 0|[10.0,3.0,2.0,65....| 
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
only showing top 5 rows 

和異常:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<c9003_double_vecAssembler_41f4486b7bab:double,c0022_double_vecAssembler_41f4486b7bab:double,c9014_double_vecAssembler_41f4486b7bab:double,c9008_double_vecAssembler_41f4486b7bab:double,a8401_double_vecAssembler_41f4486b7bab:double,c0021:double,d1417_double_vecAssembler_41f4486b7bab:double,d0006_double_vecAssembler_41f4486b7bab:double,c0023_double_vecAssembler_41f4486b7bab:double,d1501_double_vecAssembler_41f4486b7bab:double,c0020_double_vecAssembler_41f4486b7bab:double,d0007_double_vecAssembler_41f4486b7bab:double,c0024_double_vecAssembler_41f4486b7bab:double,c4018_double_vecAssembler_41f4486b7bab:double,at180_double_vecAssembler_41f4486b7bab:double,c1421_double_vecAssembler_41f4486b7bab:double>) => vector) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) 
    ... 16 more 

更新: 看起來像數據中的問題,但我不明白在哪裏。我嘗試刪除除「label」和「c9003」之外的所有列,它工作正常。它與其他一些列一起工作正常,但在c9014上引發異常。現在,我想這個查詢數據幀

var data = sparkSession.sql("select NVL(target,0) as target, cast(NVL(c9003,0) as int) as c9003, cast(NVL(c9014,0) as int) as c9014 from features where c9014 is not null") 
data.show(20) 
+------+-----+-----+ 
|target|c9003|c9014| 
+------+-----+-----+ 
|  0| 10| 4| 
|  0| 10| 3| 
|  0| 100| 4| 
|  0| 100| 5| 
|  0| 10| 2| 
|  0| 10| 6| 
|  0| 10| 2| 
|  0| 90| 4| 
|  0| 80| 4| 
|  0| 80| 5| 
|  0| 10| 2| 
|  0| 90| 8| 
|  0| 90| 8| 
|  0| 90| 8| 
|  0| 90| 4| 
|  0| 80| 5| 
|  0| 80| 2| 
|  0| 80| 2| 
|  0| 90| 7| 
|  0| 90| 8| 
+------+-----+-----+ 
only showing top 20 rows 

添加矢量列後:

root 
|-- label: string (nullable = false) 
|-- c9003: integer (nullable = true) 
|-- c9014: integer (nullable = true) 
|-- features: vector (nullable = true) 

UPDATE2:看起來像內存/數據量的問題。我試圖在SQL添加過濾器:

  1. 投(NVL(c9014,9999)爲INT)> 1000 - >它工作正常

  2. 投(NVL(c9014,9999)作爲INT)< = 1000 - >它是在c9014工作正常

  3. 沒有過濾器 - >提高例外

上內存調整任何提示?

+0

你能分享幾行數據集嗎? – BDR

+0

我已更新主帖,看起來像c9014列中的問題 – Triffids

回答

0

問題解決了,在SQL查詢問題中「cast(NVL(c9014,0)int)爲c9014」 這段代碼可以產生NULL,CAST()應該在NVL()之前使用。