5
我使用的Spark 2.2,我想讀卡夫卡的JSON消息行,它們變換爲DataFrame
並將它們作爲一個:這個我jsontostructs在火花結構流
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
可以實現:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
我想要更多的東西是這樣的:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
我試着用struct
使用from_json
功能:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
,但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
然後我試圖使用explode
但我只拿到了異常說:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
任何想法如何使這項工作?