2017-03-06 59 views
0

目前在我的Scala項目中,我使用kafka通過spark-streaming來調用xml數據。我正在以簡單的方式:xml到Scala中的DataFrame

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaparams,topics) 
val lines = inputstream.map(_._2) 
lines.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    println(record)} 
    )}  
) 

之後,我必須將我的字符串轉換爲DataFrame。爲此,我使用導出到.xml並使用sqlcontext.read.load(「pathtofile」)導入,並且一切正常。

問題:是否可以直接從kafka-rdd將單行xml-string轉換爲DataFrame,我該怎麼做?

回答

0

隨着xml loader使用,你可以實現你的目標,如:

import org.apache.spark.sql.SQLContext 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read 
    .format("com.databricks.spark.xml") 
    .option("rowTag", "<your tag>") 
    .load("<path to your xml>") 
+0

是的,我已經用這種方式來從文件導入現有的XML,正如我已經提到,它的偉大工程。但我的問題如何從rdd直接創建df,我已經從kafka接收xml的步驟:) –