2017-08-28 166 views
0

我已加載的文本文件數據這樣的火花數據幀簡單的計數操作:錯誤而做

1 The Nightmare Before Christmas 1993 3.9 4568 
2 The Mummy 1932 3.5 4388 
3 Orphans of the Storm 1921 3.2 9062 
4 The Object of Beauty 1991 2.8 6150 
5 Night Tide 1963 2.8 5126 
6 One Magic Christmas 1985 3.8 5333 
7 Muriel's Wedding 1994 3.5 6323 
8 Mother's Boys 1994 3.4 5733 
9 Nosferatu: Original Version 1929 3.5 5651 
10 Nick of Time 1995 3.4 5333 
11 Broken Blossoms 1919 3.3 5367 
12 Big Night 1996 3.6 6561 
13 The Birth of a Nation 1915 2.9 12118 
14 The Boys from Brazil 1978 3.6 7417 

第一步: 使用下面的命令:

//Loaded into rdd 
val rddLoad = sc.textFile("/user/rahulm.3564_gmail/IMDB_DATA.txt"); 
//Split based on commas since it is a comma separated file 
val rddLoadSplit = rddLoad.map(_.split(',')) 

第二步: 然後創建如下的DataFrame:

case class MovieData(serialNo:Int, movieName:String, releaseYear:Int, rating:Double, runningTime:Int); 

val dfSchema = rddLoadSplit.map {case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt)}.toDF(); 

Step4:dfSc hema.show給出正確的結果:

scala> dfSchema .show 

only showing top 20 rows 

第四步:

現在,當我做了dfSchema.count,我得到的錯誤如下:

17/08/28 11:36:24 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 47, i 
p-172-31-58-214.ec2.internal): scala.MatchError: [Ljava.lang.String;@62202bc4 (of class [Ljava.lang.String;) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

是你的數據逗號分隔或製表符分隔?您的數據似乎是製表符分隔的,但您的代碼表示逗號分隔,爲什麼? –

回答

0

這是因爲數據包含格式不正確的行其不遵循預期格式,並且模式匹配失敗,scala.MatchError這裏:

{case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt)} 

我會使用PartialFunctioncollect(你可能仍然必須處理鑄造過程中的例外):

rddLoadSplit.collect { 
    case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt) 
}.toDF(); 

csv讀者適當delimitermode設置爲DROPMALFORMED

spark.read.format("csv").option("mode", "DROPMALFORMED").load("/user/rahulm.3564_gmail/IMDB_DATA.txt")