2017-04-15 23 views
0

下面是我的spark/SCALA程序來讀取我的源文件。 (CSV文件)在apache spark中用模式解析文件

val csv = spark.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") //reading the headers 
// .option("mode", "DROPMALFORMED") 
    .option("inferSchema", "true") 

    .load("C:\\TestFiles\\SAP_ENT_INVBAL.csv"); //.csv("csv/file/path") //spark 2.0 api 


csv.show() 



csv.printSchema() 
csv.show() 

}

輸出包含文件頭,但我處理,我需要不同的命名約定,而不是文件頭。

我已經嘗試了幾個選項,並且效果很好。

  1. 重命名數據框列
  2. 使用添加(StructField功能

但我想我的代碼是通用的,只要通過架構文件在讀取文件,並創建根據列數據框到架構文件。

請幫助解決這個問題。

回答

0

這裏是例如,從火花CSV文件關於如何指定自定義schema-

可以讀取數據時手動指定架構:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 

val sqlContext = new SQLContext(sc)  
val customSchema = StructType(Array(
    StructField("year", IntegerType, true), 
    StructField("make", StringType, true), 
    StructField("model", StringType, true), 
    StructField("comment", StringType, true), 
    StructField("blank", StringType, true))) 

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") // Use first line of all files as header 
    .schema(customSchema) 
    .load("cars.csv") 
+0

但是,如何將customSchema存儲在文件中並將其傳遞給.schema? – aks

0

如果你只是需要重命名列,您可以使用toDF方法,傳遞新的名稱列,例如

val csv = spark.read.option("header", "true") 
    .csv("C:\\TestFiles\\SAP_ENT_INVBAL.csv") 
    .toDF("newColAName", "newColBName", "newColCName")