2017-06-22 67 views
1
爲Java,並試圖在 .csv文件作爲 DataFrame使用 DataFrameReader閱讀

學習星火,甚至不能得到一個超級簡單的.csv文件工作,我不斷收到異常java.lang.NegativeArraySizeException的Java星火DataFrameReader java.lang.NegativeArraySizeException

下面是我在做什麼:

public void test() { 
    DataFrameReader dataFrameReader = new DataFrameReader(getSparkSession()); 

    StructType parentSchema = new StructType(new StructField[] { 
      DataTypes.createStructField("NAME", DataTypes.StringType, false), 
    }); 

    Dataset<Row> parents = dataFrameReader.schema(parentSchema).csv("/Users/mjsee/Downloads/test.csv"); 
    parents.show(); 
} 

,這裏是我如何設置我的火花會議

sparkSession = SparkSession.builder() 
       .appName(getApplicationName()) 
       .master("local[*]") 
       .config("spark.driver.host", "localhost") 
       .getOrCreate(); 

,這裏是我的tst.csv文件:

"JESSE" 

這裏是我的輸出

java.lang.NegativeArraySizeException 
    at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39) ~[Univocity-Parsers-2.x.jar:?] 
    at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82) ~[Univocity-Parsers-2.x.jar:?] 
    at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93) ~[Univocity-Parsers-2.x.jar:?] 
    at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74) ~[Univocity-Parsers-2.x.jar:?] 
    at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59) ~[Univocity-Parsers-2.x.jar:?] 
    at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ~[?:?] 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) ~[Spark-sql.jar:?] 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) ~[Spark-sql.jar:?] 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) ~[Spark-core.jar:?] 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) ~[Spark-core.jar:?] 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[Spark-core.jar:?] 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[Spark-core.jar:?] 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[Spark-core.jar:?] 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) ~[Spark-core.jar:?] 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) ~[Spark-core.jar:?] 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) [Spark-core.jar:?] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131] 
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] 
15:45:29.544 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 0.0 failed 1 times; aborting job 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NegativeArraySizeException 
    at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39) 
    at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82) 
    at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93) 
    at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74) 
    at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59) 
    at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146) 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138) 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:636) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:595) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:604) 
    at ModelProcessingTest.testSTUFF(ModelProcessingTest.java:86) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
    at org.mockito.internal.runners.JUnit45AndHigherRunnerImpl.run(Unknown Source) 
    at org.mockito.runners.MockitoJUnitRunner.run(Unknown Source) 
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) 
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) 
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) 
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) 
Caused by: java.lang.NegativeArraySizeException 
    at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39) 
    at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82) 
    at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93) 
    at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74) 
    at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59) 
    at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146) 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138) 
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
+0

是'/ Users/mjsee/Downloads/test.csv'是您的邊緣節點上的hdfs路徑還是路徑,還是本地機器上的路徑? –

+0

我的本地主機上的路徑 – mjsee

回答

0

univocity-parsers庫的作者在這裏。這是因爲內部火花將最大值長度設置爲-1(意思是沒有限制)。這是在2.2.0版以後的univocity-解析器中引入的。

只要確保該庫版本大於2.2.0,並且您應該沒問題,因爲舊版本不支持將maxCharsPerColumn屬性設置爲-1

如果在類路徑中有多個版本的庫,請刪除舊版本。理想情況下,你想更新到最新版本(目前2.4.1),並只使用它。它應該工作得很好,因爲我們確保對庫進行的任何更改都是向後兼容的。

0

可能是因爲您的父母造成的問題中的createStructField之後有一個逗號?

StructType parentSchema = new StructType(new StructField[] { 
      DataTypes.createStructField("NAME", DataTypes.StringType, false) 
    }); 

人(資源下CSV)
賈根,Pantula,37,新加坡
Neeraja,Pantula,32,新加坡
拉馬,Pantula,34,印度
Rajya,Akundi,32,印度
Viswanath,Pantula,42,印度

代碼

SparkSession session = getSession();  
DataFrameReader reader = new DataFrameReader(session); 
StructType parentSchema = new StructType(new StructField[] { 
    DataTypes.createStructField("FirstName", DataTypes.StringType, false), 
    DataTypes.createStructField("LastName", DataTypes.StringType, false), 
    DataTypes.createStructField("Age", DataTypes.IntegerType, false), 
    DataTypes.createStructField("Country", DataTypes.StringType, false) 
}); 

String path = getClass().getClassLoader() 
         .getResource("Person") 
         .getPath() 
         .substring(1); 
reader.schema(parentSchema).csv(path).show(); 

輸出

+---------+--------+---+---------+ 
|FirstName|LastName|Age| Country| 
+---------+--------+---+---------+ 
| Jagan| Pantula| 37|Singapore| 
| Neeraja| Pantula| 32|Singapore| 
|  Rama| Pantula| 34| India| 
| Rajya| Akundi| 32| India| 
|Viswanath| Pantula| 42| India| 
+---------+--------+---+---------+