你json
數據似乎已損壞,即它不能通過使用spark.read.json("myfile.json")
有解決類似的問題通過使用wholeTextFiles
API讀入有效的數據幀
val rdd = sc.wholeTextFiles("myfile.json")
val json = rdd.flatMap(_._2.replace(":\n", ":").replace(",\n", "").replace("}\n", "}").replace(" ", "").replace("}{", "}\n{").split("\n"))
這應該會給你rdd
數據(個有效jsons)作爲
{"Location":{"filter":{"name":"houston","Disaster":"hurricane"}}}
{"Location":{"filter":{"name":"florida","Disaster":"hurricane"}}}
{"Location":{"filter":{"name":"seattle"}}}
現在你可以閱讀json rdd
到dataframe
val df = sqlContext.read.json(json)
這應該給你
+---------------------+
|Location |
+---------------------+
|[[hurricane,houston]]|
|[[hurricane,florida]]|
|[[null,seattle]] |
+---------------------+
與schema
爲
root
|-- Location: struct (nullable = true)
| |-- filter: struct (nullable = true)
| | |-- Disaster: string (nullable = true)
| | |-- name: string (nullable = true)
現在,你有一個有效的數據幀,您可以將filter
你申請
val newTable = df.filter($"Location.filter.Disaster" isnotnull)
newTable
將
+---------------------+
|Location |
+---------------------+
|[[hurricane,houston]]|
|[[hurricane,florida]]|
+---------------------+