2016-06-10 67 views
0

我無法將json數據推入配置單元下面是示例json數據和我的工作。請建議我缺少一個使用spark sql將json數據加載到配置單元中

JSON數據

{ 
"Employees" : [ 
{ 
"userId":"rirani", 
"jobTitleName":"Developer", 
"firstName":"Romin", 
"lastName":"Irani", 
"preferredFullName":"Romin Irani", 
"employeeCode":"E1", 
"region":"CA", 
"phoneNumber":"408-1234567", 
"emailAddress":"[email protected]" 
}, 
{ 
"userId":"nirani", 
"jobTitleName":"Developer", 
"firstName":"Neil", 
"lastName":"Irani", 
"preferredFullName":"Neil Irani", 
"employeeCode":"E2", 
"region":"CA", 
"phoneNumber":"408-1111111", 
"emailAddress":"[email protected]" 
}, 
{ 
"userId":"thanks", 
"jobTitleName":"Program Directory", 
"firstName":"Tom", 
"lastName":"Hanks", 
"preferredFullName":"Tom Hanks", 
"employeeCode":"E3", 
"region":"CA", 
"phoneNumber":"408-2222222", 
"emailAddress":"[email protected]" 
} 
] 
} 

我試圖用sqlcontext和jsonFile方法加載它無法解析JSON

val f = sqlc.jsonFile("file:///home/vm/Downloads/emp.json") 
f.show 

error is : java.lang.RuntimeException: Failed to parse a value for data type StructType() (current token: VALUE_STRING) 

我以不同的方式,並能夠嘗試破解並得到架構

val files = sc.wholeTextFiles("file:///home/vm/Downloads/emp.json")   
val jsonData = files.map(x => x._2) 
sqlc.jsonRDD(jsonData).registerTempTable("employee") 
val emp= sqlc.sql("select Employees[1].userId as ID,Employees[1].jobTitleName as Title,Employees[1].firstName as FirstName,Employees[1].lastName as LastName,Employees[1].preferredFullName as PeferedName,Employees[1].employeeCode as empCode,Employees[1].region as Region,Employees[1].phoneNumber as Phone,Employees[1].emailAddress as email from employee") 
emp.show // displays all the values 

我能夠得到的數據和模式s對於每條記錄都非常需要,但是我錯過了一個想法來獲取所有數據並加載到配置單元中。

任何幫助或建議都非常appreaciated。

回答

0

當文件包含每行一個JSON對象時,SparkSQL僅支持讀取JSON文件。

SQLContext.scala

/** 
    * Loads a JSON file (one object per line), returning the result as a [[DataFrame]]. 
    * It goes through the entire dataset once to determine the schema. 
    * 
    * @group specificdata 
    * @deprecated As of 1.4.0, replaced by `read().json()`. This will be removed in Spark 2.0. 
    */ 
    @deprecated("Use read.json(). This will be removed in Spark 2.0.", "1.4.0") 
    def jsonFile(path: String): DataFrame = { 
    read.json(path) 
    } 

你的文件應該是這樣的(嚴格來說,這不是一個正確的JSON文件)

{"userId":"rirani","jobTitleName":"Developer","firstName":"Romin","lastName":"Irani","preferredFullName":"Romin Irani","employeeCode":"E1","region":"CA","phoneNumber":"408-1234567","emailAddress":"[email protected]"} 
{"userId":"nirani","jobTitleName":"Developer","firstName":"Neil","lastName":"Irani","preferredFullName":"Neil Irani","employeeCode":"E2","region":"CA","phoneNumber":"408-1111111","emailAddress":"[email protected]"} 
{"userId":"thanks","jobTitleName":"Program Directory","firstName":"Tom","lastName":"Hanks","preferredFullName":"Tom Hanks","employeeCode":"E3","region":"CA","phoneNumber":"408-2222222","emailAddress":"[email protected]"} 

請看看優秀JIRA issue。不要以爲這是優先考慮的事情,而只是爲了記錄。

你有兩個選擇

  1. 您的JSON數據轉換爲支持的格式,每行一個對象
  2. 有每個JSON對象一個文件 - 這將導致過多的文件。

請注意,SQLContext.jsonFile已棄用,請使用SQLContext.read.json

Examples from spark documentation

+0

感謝Pranav保持我的概念更清晰,但我很困惑,以解決這個json的要求,請幫助我。手動將所有json轉換爲每行對象一個是更困難的任務。 – Krish

2

這裏是裂紋答案

val files = sc.wholeTextFiles("file:///home/vm/Downloads/emp.json") 
val jsonData = files.map(x => x._2) 
import org.apache.spark.sql.hive._ 
import org.apache.spark.sql.hive.HiveContext 
val hc=new HiveContext(sc) 
hc.jsonRDD(jsonData).registerTempTable("employee") 
val fuldf=hc.jsonRDD(jsonData) 
val dfemp=fuldf.select(explode(col("Employees"))) 
dfemp.saveAsTable("empdummy") 
val df=sql("select * from empdummy") 
df.select ("_c0.userId","_c0.jobTitleName","_c0.firstName","_c0.lastName","_c0.preferredFullName","_c0.employeeCode","_c0.region","_c0.phoneNumber","_c0.emailAddress").saveAsTable("dummytab") 

用於優化上面的代碼的任何建議。

相關問題