2017-08-11 81 views
5

1)我們使用結構化流式處理從kafka進行消費,並將處理後的數據集寫入s3。 我們也想把處理過的數據寫到kafka前進,是否有可能從同一個流式查詢中做到這一點? (火花版本2.1.1)Spark結構化流式傳輸:多個接收器

2)在日誌中,我看到了流式查詢進度輸出,並且我有一個日誌採樣持續時間JSON,有些人可以提供更多清晰度addBatchgetBatch

3)觸發執行 - 是否需要處理提取的數據並寫入接收器?

"durationMs" : { 
    "addBatch" : 2263426, 
    "getBatch" : 12, 
    "getOffset" : 273, 
    "queryPlanning" : 13, 
    "triggerExecution" : 2264288, 
    "walCommit" : 552 
    }, 

問候 aravias

回答

6

1)是的。

在Spark 2.1.1中,您可以使用writeStream.foreach將數據寫入卡夫卡。在這個博客中有一個例子:https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

或者您可以使用Spark 2.2.0,它添加Kafka接收器以支持正式寫入Kafka。

2)getBatch測量從源創建DataFrame的時間。這通常很快。 addBatch可測量在接收器中運行DataFrame的時間。

3)triggerExecution措施多長時間運行觸發器執行,通常幾乎相同getOffset + getBatch + addBatch

+0

感謝您的答覆,可以請你澄清如下 - 寫從源主題創建一個數據集時都S3與卡夫卡檢查點已爲每個匯另行規定,因此,它是公平的期望即使使用從該源創建的同一DataSet寫入這些2個差異匯,數據將從源主題讀取兩次? – user2221654

+0

如果您有兩個接收器,這意味着您有兩個查詢。每個查詢都有自己的Kafka使用者,並獨立地從Kafka獲取數據。 – zsxwing

0

有類似的情況有問題,我試圖寫數據到兩個kafka接收器。我正在獲取classCastException,如下所示。代碼看起來像這樣

final Dataset<String> eventDataset = feedMessageDataset 
      .map(toEvent(nodeCodeToAliasBroadcast), OBSERVED_EVENT_ENCODER) 
      .map(SparkFeedReader::serializeToJson, STRING()); 
    final StreamingQuery eventQuery = kafkaStreamWriterForEvents(eventDataset, configuration, feedReaderEngineName).start(); 

    final Dataset<String> splunkEventDataset = feedMessageDataset 
      .map(toSplunkEvent(), SPLUNK_OBSERVED_EVENT_ENCODER) 
      .filter(event -> !event.getIndicatorCode().equals(HEARBEAT_INDICATOR_CODE)) 
      .map(SparkFeedReader::serializeToJson, STRING()); 

    final StreamingQuery splunkEventQuery = kafkaStreamWriterForSplunkEvents(splunkEventDataset, configuration, feedReaderEngineName).start(); 

如果我評論一個接收器它工作正常。 這發生在spark 2.2.0中。

java.lang.ClassCastException: x.SplunkObservedEvent cannot be cast to x.ObservedEvent 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
相關問題