我們開發了一個火花流應用程序,它可以從kafka獲取數據並寫入mongoDB。在輸入DStream中創建foreachRDD內部的連接時,我們注意到性能影響。插入到mongoDB之前,Spark應用程序會進行一些驗證。我們正在探索避免爲每個處理的消息連接到mongoDB的選項,而我們希望一次處理一個批處理間隔內的所有消息。以下是Spark流應用程序的簡化版本。我們所做的一件事是將所有消息附加到數據框,並嘗試在foreachRDD之外插入該數據框的內容。但是當我們運行這個應用程序時,將數據框寫入mongoDB的代碼不會被執行。避免與火花流傳輸的mongoDB多連接
請注意,我註釋了foreachRDD中的一部分代碼,我們用它將每條消息插入到mongoDB中。由於我們一次插入一條消息,因此現有方法非常緩慢。任何關於性能改進的建議都非常感謝。
謝謝
package com.testing
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.joda.time._
import org.joda.time.format._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON
import scala.io.Source._
import java.util.Properties
import java.util.Calendar
import scala.collection.immutable
import org.json4s.DefaultFormats
object Sample_Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Sample_Streaming")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val props = new HashMap[String, Object]()
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)
val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "connectionURI")
.option("spark.mongodb.input.collection", "schemaCollectionName")
.load()
val outSchema = schemaDf.schema
var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)
KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
{
val jsonInput: JValue = parse(x)
/*Do all the transformations using Json libraries*/
val json4s_transformed = "transformed json"
val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
val df = sqlContext.read.schema(outSchema).json(rdd)
//Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once
/* df.write.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()*/
outDf = outDf.union(df)
}
}
)
//Added this part of the code in expectation to access the unioned dataframe and insert all messages at once
//println(outDf.count())
if(outDf.count() > 0)
{
outDf.write
.option("spark.mongodb.output.uri", "connectionURI")
.option("collection", "Collection")
.mode("append").format("com.mongodb.spark.sql").save()
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
我很困惑「我們希望一次處理DStream中的所有消息」。 DStreams是無限的......你的意思是一次處理一個批處理間隔內的所有消息嗎? –
是的我打算一次處理一個批處理間隔內的所有消息。對不起,感到困惑 – Sid