2017-09-01 537 views
1

我正在嘗試使用Spark結構化流從卡夫卡主題讀取XML數據。如何從Kafka讀取XML格式的流數據?

我試過使用Databricks spark-xml包,但是我收到一個錯誤消息,說這個包不支持流式閱讀。有什麼方法可以使用結構化流從Kafka主題中提取XML數據?

我當前的代碼:

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .format('com.databricks.spark.xml') \ 
     .options(rowTag="MainElement")\ 
     .option("kafka.bootstrap.servers", "localhost:9092") \ 
     .option(subscribeType, "test") \ 
     .load() 

錯誤:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load. 
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading 
     at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234) 

回答

3
.format("kafka") \ 
.format('com.databricks.spark.xml') \ 

最後一個與com.databricks.spark.xml勝變爲(隱藏卡夫卡作爲源)的數據流源。

換句話說,以上相當於.format('com.databricks.spark.xml')單獨。

正如您可能已經體驗的那樣,Databricks spark-xml包不支持流式讀取(即不能用作流式源)。該軟件包不適用於流媒體。

Is there any way I can extract XML data from Kafka topic using structured streaming?

您只需使用標準函數或UDF自行訪問和處理XML。在結構化流式傳輸到Spark 2.2.0中沒有內置的支持流式處理XML。

無論如何這應該不是什麼大問題。一個Scala代碼可能如下所示。

val input = spark. 
    readStream. 
    format("kafka"). 
    ... 
    load 

val values = input.select('value cast "string") 

val extractValuesFromXML = udf { (xml: String) => ??? } 
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value)) 

// print XMLs and numbers to the stdout 
val q = numbersFromXML. 
    writeStream. 
    format("console"). 
    start 

另一種可能的解決辦法是寫自己的自定義流Source將應對def getBatch(start: Option[Offset], end: Offset): DataFrame的XML格式。那應該工作。

+1

謝謝,亞採。我寫了UDF來解析XML數據。它正在工作。我將很快發佈該UDF。 –

1

不能混合格式這種方式。卡夫卡源加載爲包括值的數目,如keyvaluetopic,與value列存儲payload as a binary type

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

...

value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

解析該內容是用戶的責任,並且不能被委派給其他數據源。例如,請參閱我對How to read records in JSON format from Kafka using Structured Streaming?的回答。

對於XML,您可能需要一個UDF(UserDefinedFunction),但您可以先嚐試Hive XPath functions。您還應該解碼二進制數據。

2
import xml.etree.ElementTree as ET 
df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", "localhost:9092") \ 
     .option(subscribeType, "test") \ 
     .load() 

然後我寫了一個Python UDF

def parse(s): 
    xml = ET.fromstring(s) 
    ns = {'real_person': 'http://people.example.com', 
     'role': 'http://characters.example.com'} 
    actor_el = xml.find("DNmS:actor",ns) 

    if(actor_el): 
     actor = actor_el.text 
    role_el.find('real_person:role', ns) 
    if(role_el): 
     role = role_el.text 
    return actor+"|"+role 

註冊此UDF

extractValuesFromXML = udf(parse) 

    XML_DF= df .withColumn("mergedCol",extractroot("value")) 

    AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\\|").getItem(0))\ 
     .withColumn("Role", split(col("mergedCol"), "\\|").getItem(1))