2017-08-10 64 views
1

我正在嘗試讀取一個大表來觸發(〜100M行)。該表是PostgreSQL和我們閱讀如下:從PostgreSQL讀取100M行到Spark並寫入實木複合地址

val connectionProperties = new Properties() 
connectionProperties.put("user", "$USER") 
connectionProperties.put("password", "$PASSWORD") 

// val connection = DriverManager.getConnection("$SERVER", "$USER", "$PASSWORD") 
//connection.isClosed() 

val jdbc_url = s"jdbc:postgresql://${"$HOST"}:${$PORT}/${"$DB"}" 
val df = spark.read.option("inferSchema", true).jdbc(jdbc_url, "$TABLE", connectionProperties) 

但是,我們的SQL表中有2列作爲貨幣數據類型(格式爲$ 100,000.23)。在火花中讀取時,它會轉換爲雙精度和拋出異常。

我們嘗試過: a)將列數據轉換爲Double。但這並沒有幫助,因爲Spark也自動投入翻倍。它在價值觀上有問題。 (使用 b)數據已經以逗號在DataFrame中。試圖使用PostgreSQL方言(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

任何幫助將非常感激。

+0

您是否嘗試調用SELECT查詢,並將該lame Money數據類型轉換爲更易於使用的數據類型*之前* Spark接收數據?例如。 '(選擇A,B,鑄造(C爲十進制(16,2))作爲來自TBL的CC)x' –

+0

還有'decimal(,< - 十進制數字><)'避免了不可避免的舍入誤差浮點類型。並用財務用例玩地獄...... –

回答

1

您可以嘗試手動指定模式,將該列讀取爲字符串,然後手動分析定義用戶定義函數的值。

要手動指定的模式,你需要寫這樣的事情

val schema = 
    StructType(
    StructField("your-example-column1", IntegerType, true) :: 
    StructField("your-money-column", StringType, true) :: Nil) 
    spark.read.schema(schema) 

見星火斯卡拉API:

要了解更多關於如何轉換StringType爲您需要的數據類型fer to this question

+1

在生產環境中推斷Schema並不是一個好習慣。您應該始終手動指定模式。 –

+0

正如僞代碼中提到的,我們已經嘗試過使用withColumn和selectExpr來更改數據在Dataframe中的數據類型。由於double對逗號值給予例外,所以不起作用。 – Pranab

+0

它不應該起作用,因爲數據框無法推斷出您的類型。相反,您應該在將數據讀取到數據框之前指定模式_before_。 – addmeaning

相關問題