2016-06-28 93 views
-1

我有幾個XML文件&我已經把這些放在卡夫卡主題&我已經創建了卡夫卡主題的Dstream對象。由於我想解析主題中的xml數據,因此我無法進一步處理。如果任何在Spark流中處理過xml處理的人都可以給我提供他們的幫助。我在過去的兩天裏一直堅持這一點。Kafka Spark流XML解析/處理

我採取的方法是XML文件 - >卡夫卡主題 - >在Spark流中處理 - >再次將它放回卡夫卡。

我能夠將數據放回卡夫卡話題,但無法處理或做火花流中主題的數據。

+0

你可以添加你的代碼,並具體問題是什麼以及你得到什麼錯誤或異常? – maasg

+0

@Harsha在閱讀來自kafka的消息時遇到了同樣的問題,即將消息作爲每個標記作爲消息來獲取。你能否讓我知道你是如何解決這個問題的。 –

+0

@ankush reddy使用JAXB來驗證XML的各自模式 – Harsha

回答

0

您期待什麼樣的處理?如果您期待任何一種數據提取,您可以做的是,foreach消息,將它們轉換爲json(xml to json非常簡單),並將jsonRDD和JsonRDD轉換爲DF直接轉換。因此,您將能夠在數據框上進行任何選擇或進行其他操作。

我需要你幾個輸入端,以提供準確的解決方案

1)你想出來的數據是什麼? 2)Dataframe中的數據是否足夠。?

如果你能夠解釋輸入,這將是非常有益的。

我已經添加了一個示例代碼來獲取數據框的XML數據。

val jsonStream = kafkaStream.transform(
     y => { 
     y.filter(x => x._1 != null && x._2 != null).map(x => { 
      XML.toJSONObject(x).toString(4); 
     } 
     ) 
     }) 


jsonStream.foreachRDD(x => { 
     val sqlContext = SQLContextSingleton.getInstance(x.sparkContext) 
     if (x != null) { 

     val df = sqlContext.read.json(x) 
     // Your DF Operations 
     } 
     } 
    } 
) 

object SQLContextSingleton { 

    @transient private var instance: HiveContext = _ 

    def getInstance(sparkContext: SparkContext): HiveContext = { 
    if (instance == null) { 
     sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false"); 
     instance = new HiveContext(sparkContext) 
    } 
    instance 
    } 
} 
+0

嗨Srini,感謝您的快速回復。這個問題已經解決,這是一個非常複雜的用例,我們想要使用火花流連接3種類型的xml。終於完成了。我們使用JAXB來驗證xml的各自的模式。正如我所說的,這個用例非常複雜,有很多編碼,因此我沒有分享任何適合我的代碼。再一次感謝你。 – Harsha