2017-04-09 39 views
0

我有,我想申請到.csv文件的每一行功能的工作原理:的NullPointerException應用功能引發RDD,關於非RDD

def convert(inString: Array[String]) : String = { 

    val country = inString(0) 
    val sellerId = inString(1) 
    val itemID = inString(2) 
    try{ 
    val minidf = sqlContext.read.json(sc.makeRDD(inString(3):: Nil)) 
     .withColumn("country", lit(country)) 
     .withColumn("seller_id", lit(sellerId)) 
     .withColumn("item_id", lit(itemID)) 
     val finalString = minidf.toJSON.collect().mkString(",") 
     finalString 
    } catch{ 
     case e: Exception =>println("AN EXCEPTION "+inString.mkString(",")) 
     ("this is an exception "+e+" "+inString.mkString(",")) 
    } 
} 

此功能轉換排序的條目:

CA  112578240  132080411845 [{"id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}] 

在那裏我有4列,第4是一個JSON斑點,到

[{"country":"CA", "seller":112578240", "product":112578240, "id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}] 

這是json對象,前三列已插入第四列。現在

,這個工程:

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).collect().map(x => convert(x)) 

或本:

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).take(10).map(x => convert(x)) 

但確實

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).map(x => convert(x)) 

最後一個拋出java.lang.NullPointerException

我包含一個try catch子句,以便查看這個失敗究竟在哪裏,並且它對每一行都是失敗的。

我在做什麼錯在這裏?

+1

你不能把'sqlContext'在Spark地圖,因爲這個對象只能在驅動程序節點上存在。 – jamborta

+0

我明白了。任何特別的建議來解析json對象,插入新的信息,並回寫到json字符串? – elelias

+1

噴霧JSON可能是最好的使用:https://github.com/spray/spray-json – jamborta

回答