2017-10-13 176 views
0

我有一個小場景,我讀取文本文件並根據日期計算平均值並將總結存儲到Mysql數據庫中。插入到Mysql數據庫後更新數據集數據

以下是代碼

val repo_sum = joined_data.map(SensorReport.generateReport) 
      repo_sum.show() --- STEP 1 
      repo_sum.write.mode(SaveMode.Overwrite).jdbc(url, "sensor_report", prop) 
      repo_sum.show() --- STEP 2 

計算平均在repo_sum數據幀以下之後步驟1

+----------+------------------+-----+-----+ 
|  date|    flo| hz|count| 
+----------+------------------+-----+-----+ 
|2017-10-05|52.887049194476745|10.27| 5.0| 
|2017-10-04| 55.4188048943416|10.27| 5.0| 
|2017-10-03| 54.1529270444092|10.27| 10.0| 
+----------+------------------+-----+-----+ 

的結果。然後,執行保存命令,並在步驟2中的數據集的值是

+----------+-----------------+------------------+-----+ 
|  date|    flo|    hz|count| 
+----------+-----------------+------------------+-----+ 
|2017-10-05|52.88704919447673|31.578524597238367| 10.0| 
|2017-10-04| 55.4188048943416| 32.84440244717079| 10.0| 
+----------+-----------------+------------------+-----+ 

以下是完整代碼

class StreamRead extends Serializable { 
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this); 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]") 
    val ssc = new StreamingContext(conf, Seconds(2)) 
    val sqlContext = new SQLContext(ssc.sparkContext) 
    import sqlContext.implicits._ 
    val sensorDStream = ssc.textFileStream("file:///C:/Users/M1026352/Desktop/Spark/StreamData").map(Sensor.parseSensor) 
    val url = "jdbc:mysql://localhost:3306/streamdata" 
    val prop = new java.util.Properties 
    prop.setProperty("user", "root") 
    prop.setProperty("password", "root") 
    val tweets = sensorDStream.foreachRDD { 
     rdd => 
     if (rdd.count() != 0) { 
      val databaseVal = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/streamdata", "sensor_report", prop) 
      val rdd_group = rdd.groupBy { x => x.date } 
      val repo_data = rdd_group.map { x => 
      val sum_flo = x._2.map { x => x.flo }.reduce(_ + _) 
      val sum_hz = x._2.map { x => x.hz }.reduce(_ + _) 
      val sum_flo_count = x._2.size 
      print(sum_flo_count) 
      SensorReport(x._1, sum_flo, sum_hz, sum_flo_count) 
      } 
      val df = repo_data.toDF() 
      val joined_data = df.join(databaseVal, Seq("date"), "fullouter") 
      joined_data.show() 
      val repo_sum = joined_data.map(SensorReport.generateReport) 
      repo_sum.show() 
      repo_sum.write.mode(SaveMode.Overwrite).jdbc(url, "sensor_report", prop) 
      repo_sum.show() 
     } 
    } 

    ssc.start() 
    WorkerAndTaskExample.main(args) 
    ssc.awaitTermination() 
    } 
    case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double, sedPPM: Double, psi: Double, chlPPM: Double) 

    object Sensor extends Serializable { 
    def parseSensor(str: String): Sensor = { 
     val p = str.split(",") 
     Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble) 
    } 
    } 
    case class SensorReport(date: String, flo: Double, hz: Double, count: Double) 
    object SensorReport extends Serializable { 
    def generateReport(row: Row): SensorReport = { 
     print(row) 
     if (row.get(4) == null) { 
     SensorReport(row.getString(0), row.getDouble(1)/row.getDouble(3), row.getDouble(2)/row.getDouble(3), row.getDouble(3)) 
     } else if (row.get(2) == null) { 
     SensorReport(row.getString(0), row.getDouble(4), row.getDouble(5), row.getDouble(6)) 
     } else { 
     val count = row.getDouble(3) + row.getDouble(6) 
     val flow_avg_update = (row.getDouble(6) * row.getDouble(4) + row.getDouble(1))/count 
     val flow_flo_update = (row.getDouble(6) * row.getDouble(5) + row.getDouble(1))/count 
     print(count + " : " + flow_avg_update + " : " + flow_flo_update) 
     SensorReport(row.getString(0), flow_avg_update, flow_flo_update, count) 
     } 
    } 
    } 

據我瞭解,當保存命令在火花中執行時,整個過程再次運行,我的理解是正確的,請讓我知道。

+0

嘗試緩存的'RDD',看看它是否仍然會發生:'val repo_sum = joined_data.map(SensorReport.generateReport).cache()'。 – Shaido

+0

是val repo_sum = joined_data.map(SensorReport.generateReport).cache()效果很好 –

+0

添加了更完整的問題答案。 – Shaido

回答

1

在Spark中,所有轉換都是懶惰的,直到調用action纔會發生。同時,這意味着如果在同一RDD或數據幀上調用多個動作,所有計算都將執行多次。這包括加載數據和所有轉換。

爲避免出現這種情況,請使用cache()persist()(除了cache()可指定不同類型的存儲器,缺省值僅爲RAM存儲器)。在第一次使用動作之後,cache()會將RDD /數據幀保留在內存中。因此,避免多次運行相同的轉換。


在這種情況下,由於兩個動作上的數據幀進行是造成這一意外的行爲,緩存數據框可以解決這個問題:

val repo_sum = joined_data.map(SensorReport.generateReport).cache()