0
我有一些CSV文件,需要將它們按部分文件名合併到一個RDD中。Spark&Scala - RDD遍歷中的NullPointerException
例如,對於下面的文件
$ ls
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv
我需要將文件與名稱20140101*.csv
成RDD結合起來,就可以等工作。
我正在使用sc.wholeTextFiles
來讀取整個目錄,然後按它們的圖案對文件名進行分組,以形成一串文件名。 然後,我將字符串傳遞給sc.textFile以將文件作爲單個RDD打開。
這是我的代碼 -
val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey
data.map { a =>
var name = a._2.mkString(",")
(a._1, name)
}
data.foreach { a =>
var file = sc.textFile(a._2)
println(file.count)
}
我也得到SparkException - NullPointerException
當我嘗試打電話textFile
。錯誤堆棧是指RDD內的一個迭代器。我無法理解的錯誤 -
15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
然而,當我在火花殼做sc.textFile(data.first._2).count
,我能夠形成RDD並能夠檢索計數。
任何幫助,非常感謝。
「var file = sc.textFile(a._2)」在另一個rdd映射的foreach中不起作用。你不能像這樣嵌套RDD。 –
謝謝@保羅......我糾正了它,現在我可以創建RDD。請回答這個問題,我會接受它。 –