2017-08-01 67 views
0

我們開發了一個火花流應用程序,它可以從kafka獲取數據並寫入mongoDB。在輸入DStream中創建foreachRDD內部的連接時,我們注意到性能影響。插入到mongoDB之前,Spark應用程序會進行一些驗證。我們正在探索避免爲每個處理的消息連接到mongoDB的選項,而我們希望一次處理一個批處理間隔內的所有消息。以下是Spark流應用程序的簡化版本。我們所做的一件事是將所有消息附加到數據框,並嘗試在foreachRDD之外插入該數據框的內容。但是當我們運行這個應用程序時,將數據框寫入mongoDB的代碼不會被執行。避免與火花流傳輸的mongoDB多連接

請注意,我註釋了foreachRDD中的一部分代碼,我們用它將每條消息插入到mongoDB中。由於我們一次插入一條消息,因此現有方法非常緩慢。任何關於性能改進的建議都非常感謝。

謝謝

package com.testing 

import org.apache.spark.streaming._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame } 
import java.util.HashMap 
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 

import org.joda.time._ 
import org.joda.time.format._ 

import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 
import com.mongodb.util.JSON 

import scala.io.Source._ 
import java.util.Properties 
import java.util.Calendar 

import scala.collection.immutable 
import org.json4s.DefaultFormats 


object Sample_Streaming { 

    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Sample_Streaming") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val props = new HashMap[String, Object]() 


    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config) 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 

    val TopicMap = Map("sampleTopic" -> 1) 
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2) 

     val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", "connectionURI") 
     .option("spark.mongodb.input.collection", "schemaCollectionName") 
     .load() 

     val outSchema = schemaDf.schema 
     var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema) 

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x => 
     { 
     val jsonInput: JValue = parse(x) 


     /*Do all the transformations using Json libraries*/ 

     val json4s_transformed = "transformed json" 

     val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil) 
     val df = sqlContext.read.schema(outSchema).json(rdd) 

//Earlier we were inserting each message into mongoDB, which we would like to avoid and process all at once  
/*  df.write.option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save()*/ 
     outDf = outDf.union(df) 

     } 

    } 

    ) 


     //Added this part of the code in expectation to access the unioned dataframe and insert all messages at once 
     //println(outDf.count()) 
     if(outDf.count() > 0) 
     { 
     outDf.write 
        .option("spark.mongodb.output.uri", "connectionURI") 
        .option("collection", "Collection") 
        .mode("append").format("com.mongodb.spark.sql").save() 
     } 


    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 
+1

我很困惑「我們希望一次處理DStream中的所有消息」。 DStreams是無限的......你的意思是一次處理一個批處理間隔內的所有消息嗎? –

+0

是的我打算一次處理一個批處理間隔內的所有消息。對不起,感到困惑 – Sid

回答

1

這聽起來像你想連接的數量減少到mongodb,爲了這個目的,你必須使用foreachPartition代碼,當你成爲連接做的MongoDB看到spec,代碼將看起來像這樣:

rdd.repartition(1).foreachPartition { 
    //get instance of connection 
    //write/read with batch to mongo 
    //close connection 
} 
+0

感謝您的輸入。我會試試看。 – Sid