2015-07-21 124 views
0

我有一些CSV文件,需要將它們按部分文件名合併到一個RDD中。Spark&Scala - RDD遍歷中的NullPointerException

例如,對於下面的文件

$ ls 
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv 
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv 

我需要將文件與名稱20140101*.csv成RDD結合起來,就可以等工作。

我正在使用sc.wholeTextFiles來讀取整個目錄,然後按它們的圖案對文件名進行分組,以形成一串文件名。 然後,我將字符串傳遞給sc.textFile以將文件作爲單個RDD打開。

這是我的代碼 -

val files = sc.wholeTextFiles("*.csv") 
val indexed_files = files.map(a => (a._1.split("_")(0),a._1)) 
val data = indexed_files.groupByKey 

data.map { a => 
    var name = a._2.mkString(",") 
    (a._1, name) 
} 

data.foreach { a => 
    var file = sc.textFile(a._2) 
    println(file.count) 
} 

我也得到SparkException - NullPointerException當我嘗試打電話textFile。錯誤堆棧是指RDD內的一個迭代器。我無法理解的錯誤 -

15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 

然而,當我在火花殼做sc.textFile(data.first._2).count,我能夠形成RDD並能夠檢索計數。

任何幫助,非常感謝。

+1

「var file = sc.textFile(a._2)」在另一個rdd映射的foreach中不起作用。你不能像這樣嵌套RDD。 –

+0

謝謝@保羅......我糾正了它,現在我可以創建RDD。請回答這個問題,我會接受它。 –

回答

2

評論轉換爲一個答案:

var file = sc.textFile(a._2) 

另一個RDD的foreach內部行不通的。你不能像這樣嵌套RDD。