0
我有,我想申請到.csv文件的每一行功能的工作原理:的NullPointerException應用功能引發RDD,關於非RDD
def convert(inString: Array[String]) : String = {
val country = inString(0)
val sellerId = inString(1)
val itemID = inString(2)
try{
val minidf = sqlContext.read.json(sc.makeRDD(inString(3):: Nil))
.withColumn("country", lit(country))
.withColumn("seller_id", lit(sellerId))
.withColumn("item_id", lit(itemID))
val finalString = minidf.toJSON.collect().mkString(",")
finalString
} catch{
case e: Exception =>println("AN EXCEPTION "+inString.mkString(","))
("this is an exception "+e+" "+inString.mkString(","))
}
}
此功能轉換排序的條目:
CA 112578240 132080411845 [{"id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}]
在那裏我有4列,第4是一個JSON斑點,到
[{"country":"CA", "seller":112578240", "product":112578240, "id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}]
這是json對象,前三列已插入第四列。現在
,這個工程:
val conv_string = sc.textFile(path_to_file).map(_.split('\t')).collect().map(x => convert(x))
或本:
val conv_string = sc.textFile(path_to_file).map(_.split('\t')).take(10).map(x => convert(x))
但確實不
val conv_string = sc.textFile(path_to_file).map(_.split('\t')).map(x => convert(x))
最後一個拋出java.lang.NullPointerException
。
我包含一個try catch
子句,以便查看這個失敗究竟在哪裏,並且它對每一行都是失敗的。
我在做什麼錯在這裏?
你不能把'sqlContext'在Spark地圖,因爲這個對象只能在驅動程序節點上存在。 – jamborta
我明白了。任何特別的建議來解析json對象,插入新的信息,並回寫到json字符串? – elelias
噴霧JSON可能是最好的使用:https://github.com/spray/spray-json – jamborta