2017-08-25 98 views
1

我們正在開發一個Spark流式ETL應用程序,它將從Kafka獲取數據,應用必要的轉換並將數據加載到MongoDB中。從Kafka收到的數據是JSON格式。根據從MongoDB獲取的查找數據,轉換應用於RDD的每個元素(JSON字符串)。由於查找數據發生變化,我需要爲每個批處理間隔提取它。查找數據是使用MongoDB中的SqlContext.read讀取的。由於SqlContext不可序列化,所以我無法在DStream.transform中使用SqlContext.read,因此我無法將其廣播到工作節點。現在我嘗試使用DStream.foreachRDD,其中我從MongoDB獲取數據並將查找數據廣播給工作人員。 RDD元素上的所有轉換均在rdd.map閉包內執行,該閉包利用廣播數據並執行轉換並返回RDD。然後將RDD轉換爲數據框並寫入MongoDB。目前,這個應用程序運行速度很慢。 PS:如果我移動從DStream.foreachRDD中提取查找數據的代碼部分,並添加DStream.transform來應用轉換並讓DStream.foreachRDD僅將數據插入到MongoDB中,則性能非常好。但是採用這種方法,查找數據不會爲每個批處理間隔更新。使用外部數據轉換DStream RDD

我在尋求幫助以瞭解這是否是一種好方法,我正在尋找一些提高性能的指導。

下面是一個僞代碼

package com.testing 


object Pseudo_Code { 
    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Pseudo_Code") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val mongoIP = "127.0.0.1:27017" 

    val DBConnectionURI = "mongodb://" + mongoIP + "/" + "DBName" 

    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 

    val group = "streaming" 

    val TopicMap = Map("sampleTopic" -> 1) 


    val KafkaDStream = KafkaUtils.createStream(ssc, zkQuorum, group, TopicMap).map(_._2) 

    KafkaDStream.foreachRDD{rdd => 
     if (rdd.count() > 0) { 

     //This lookup data has information required to perform transformation 
     //This information keeps changing, so the data should be fetched for every batch interval 

     val lookup1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", DBConnectionURI) 
     .option("spark.mongodb.input.collection", "lookupCollection1") 
     .load() 

     val broadcastLkp1 = sc.broadcast(lookup1) 

     val new_rdd = rdd.map{elem => 
     val json: JValue = parse(elem) 

     //Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data 
     //"value" extracted from json 
     val param1 = broadcastLkp1.value.filter(broadcastLkp1.value("key")==="value").select("param1").distinct() 
     val param1ReplaceNull = if(param1.count() == 0){ 
            "constant" 
           } 
           else{ 
            param1.head().getString(0) 
           } 
     //create a new JSON with a different structure 
     val new_JSON = """""" 

     compact(render(new_JSON)) 
    } 

    val targetSchema = new StructType(Array(StructField("key1",StringType,true) 
                ,StructField("key2",TimestampType,true))) 
    val transformedDf = sqlContext.read.schema(targetSchema).json(new_rdd) 


    transformedDf.write 
      .option("spark.mongodb.output.uri",DBConnectionURI) 
      .option("collection", "tagetCollectionName") 
      .mode("append").format("com.mongodb.spark.sql").save() 
     } 
    } 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 




} 
+0

這裏有一個有趣的問題。需要考慮的事情:你可以從你的kafka和你的mongoDB中流入嗎?如果是這樣的話,那麼你可以同時在兩個DStreams上工作。 –

+0

@MichelLemay你有一個關於如何從mongoDB流式傳輸的例子。我可以試試看。現在,我可以按照https://stackoverflow.com/questions/37638519/spark-streaming-how-to-periodically-refresh-cached-rdd中提供的一些說明稍微向前邁進一步。我創建了一個DStream.foreachRDD,其中我重新加載了查找數據,然後創建了DStream.transform,其中使用了查找數據並返回一個新的RDD,然後創建了另一個將數據插入到mongoDB中的foreachRDD。這有效,但表現非常糟糕。 – Sid

+0

您是否嘗試過在dataframe API中爲您的json轉換提供from_json函數?您可以嘗試結構化流式傳輸(如果您的驅動程序支持.writeStream)。 val msgSchema = Encoders.product [Message] .schema val ds = df .select(from_json($「value」.cast(「string」),msgSchema).as [Message]) – sgireddy

回答

0

經研究,該工作的解決方案是由工人讀取之後緩存廣播數據幀。以下是我爲改善性能所做的代碼更改。

val new_rdd = rdd.map{elem => 
     val json: JValue = parse(elem) 

     //Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data 
     //"value" extracted from json 
     val lkp_bd = broadcastLkp1.value 
     lkp_bd.cache() 
     val param1 = lkp_bd.filter(broadcastLkp1.value("key")==="value").select("param1").distinct() 
     val param1ReplaceNull = if(param1.count() == 0){ 
            "constant" 
           } 
           else{ 
            param1.head().getString(0) 
           } 
     //create a new JSON with a different structure 
     val new_JSON = """""" 

     compact(render(new_JSON)) 
    } 

請注意,這種方法在羣集上運行時有問題。訪問廣播數據幀時拋出空指針異常。我創建了另一個線程。 Spark Streaming - null pointer exception while accessing broadcast variable

+0

這可能是由於變量評估發生在廣播變量可用性之前的事實。您可能希望將變量標記爲懶惰,以便等到第一次使用時爲止。另一種選擇是通過直接從執行者訪問您需要的內容來完全避免廣播變量。對象GetMetaData {@transient lazy val metaData = getData; def getData = ...}關鍵是將你的對象標記爲瞬態,所以spark序列化器會忽略它並將它包裝在一個對象中。 – sgireddy