0

我正在使用Spark JDBC從MS SQL數據庫讀取數據,但我得到了一些奇怪的結果。Apache Spark JDBC DataFrame計數問題

例如,下面是我的代碼來從我的MS SQL數據庫中讀取記錄。 請注意,我正在讀取數據的表格不斷插入記錄。

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 

    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 

我一次比一次我跑我的程序,例如我總是${updateJdbcDF.count()}計數>${jdbcTable.count()} 2個不同的計數值。

有人可以解釋我爲什麼會發生這種情況?這在我的用例中創造了很多問題。如何在創建DataFrame後限制數量爲jdbcTable。我試過jdbcTable.cache(),但沒有運氣。

當我對來自jdbcTable DataFrame的其他數據幀使用任何操作時,記錄只會變得越來越大。每次使用從jdbcTable數據框派生的任何數據幀時,是否會調用jdbcTable數據幀?

+0

差異是否不變?或者你每次都得到不同的結果嗎? – philantrovert

+0

@philantrovert沒有區別是不恆定的,我每次都得到不同的計數。 – nilesh1212

+1

那麼,如果「我正在讀取數據的表格不斷插入記錄」,並且您的請求沒有定義一個固定範圍謂詞,那麼每次火花訪問它時,表中的行數都不相同。所以你所看到的(改變計數)只是預料之中,不是嗎? – GPI

回答

1

我能夠通過應用jdbcTable.cache()來解決這個問題,現在任何派生自jdbcTable數據框的DF都不會給我一個比jdbcTable.count()更高的計數。所有的計算現在都可以。感謝您的解釋@GPI

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    jdbcTable.cache() 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 


    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 
    /** 
    * MORE DATA PROCESSING HERE 
    /** 

    jdbcTable.unpersist()