我想要做的只是從rdd中提取一些信息,並使用Spark(scala)將其放入數據框中。如何提取RDD內容並使用spark(scala)輸入DataFrame
到目前爲止,我所做的就是創建一個流管道,連接到卡夫卡的話題,並把話題的內容設置在RDD:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
.outputMode("complete")
val topics = Array("vittorio")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val row = stream.map(record => record.value)
row.foreachRDD { (rdd: RDD[String], time: Time) =>
rdd.collect.foreach(println)
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val DF = rdd.toDF()
DF.show()
}
ssc.start() // Start the computation
ssc.awaitTermination()
}
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
現在,我RDD的含量:
{"event":"bank.legal.patch","ts":"2017-04-15T15:18:32.469+02:00","svc":"dpbank.stage.tlc-1","request":{"ts":"2017-04-15T15:18:32.993+02:00","aw":"876e6d71-47c4-40f6-8c49-5dbd7b8e246b","end_point":"/bank/v1/legal/mxHr+bhbNqEwFvXGn4l6jQ==","method":"PATCH","app_instance":"e73e93d9-e70d-4873-8f98-b00c6fe4d036-1491406011","user_agent":"Dry/1.0.st/Android/5.0.1/Sam-SM-N910C","user_id":53,"user_ip":"151.14.81.82","username":"7cV0Y62Rud3MQ==","app_id":"db2ffeac6c087712530981e9871","app_name":"DrApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"mxHr+bhbNqEwFvXGn4l6jQ==","request_attributes":{"legal_user":{"sharing_id":"mxHr+bhbNqEwFvXGn4l6jQ==","ndg":"","taxcode":"IQ7hUUphxFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CAA","address":"Via Batto 44","zipcode":"926","country_id":18,"city_id":122},"business_categories":[5],"company_name":"4Gzb+KJk1XAQ==","vat_number":"162340159"}},"response_attributes":{"legal_user":{"sharing_id":"mGn4l6jQ==","taxcode":"IQ7hFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CATA","address":"Via Bllo 44","zipcode":"95126","country_id":128,"city_id":12203},"business_categories":[5],"company_name":"4GnU/Nczb+KJk1XAQ==","vat_number":"12960159"}}},"class":"DPAPI"}
,做val DF = rdd.toDF()
正顯示出:
+--------------------+
| value|
+--------------------+
|{"event":"bank.le...|
+--------------------+
我會如T o實現是一個數據幀,將隨着新的RDD從流中到達而被填充。一種union
方法,但不知道如果是正確的方式,因爲我不確定所有的rdds將具有相同的模式。
例如,這是我想達到的目標:
+--------------------+------------+----------+-----+
| _id| user_ip| status|_type|
+--------------------+------------+----------+-----+
|AVtJFVOUVxUyIIcAklfZ|151.14.81.82|INCOMPLETE|DPAPI|
|AVtJFVOUVxUyIIcAklfZ|151.14.81.82|INCOMPLETE|DPAPI|
+--------------------+------------+----------+-----+
的感謝!