2017-04-25 115 views
0

我有它加載使用Spark數據API一個無頭的CSV文件以下類。阿帕奇星火數據集API - 不接受模式StructType

我的問題是,我不能讓SparkSession接受的模式StructType應該定義每個列。造成數據幀是字符串類型的unamed列

public class CsvReader implements java.io.Serializable { 

public CsvReader(StructType builder) { 
     this.builder = builder; 
    } 
private StructType builder; 

SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local"); 
// create Spark Context 
SparkContext context = new SparkContext(conf); 
// create spark Session 
SparkSession sparkSession = new SparkSession(context); 

Dataset<Row> df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg 

public void printSchema() { 
    System.out.println(builder.length()); 
    df.printSchema(); 
} 

public void printData() { 
    df.show(); 
} 

public void printMeters() { 
    df.select("meter").show(); 
} 

public void printMeterCountByGeocode_result() { 
    df.groupBy("geocode_result").count().show(); 
} 

public Dataset getDataframe() { 
      return df; 
} 

} 

導致數據幀的模式是:

root 
|-- _c0: string (nullable = true) 
|-- _c1: string (nullable = true) 
|-- _c2: string (nullable = true) 
|-- _c3: string (nullable = true) 
|-- _c4: string (nullable = true) 
|-- _c5: string (nullable = true) 
|-- _c6: string (nullable = true) 
|-- _c7: string (nullable = true) 
|-- _c8: string (nullable = true) 
|-- _c9: string (nullable = true) 
|-- _c10: string (nullable = true) 
|-- _c11: string (nullable = true) 
|-- _c12: string (nullable = true) 
|-- _c13: string (nullable = true) 

調試表明, '建設者' StrucType正確定義:

0 = {[email protected]} "StructField(geocode_result,DoubleType,false)" 
1 = {[email protected]} "StructField(meter,StringType,false)" 
2 = {[email protected]} "StructField(orig_easting,StringType,false)" 
3 = {[email protected]} "StructField(orig_northing,StringType,false)" 
4 = {[email protected]} "StructField(temetra_easting,StringType,false)" 
5 = {[email protected]} "StructField(temetra_northing,StringType,false)" 
6 = {[email protected]} "StructField(orig_address,StringType,false)" 
7 = {[email protected]} "StructField(orig_postcode,StringType,false)" 
8 = {[email protected]} "StructField(postcode_easting,StringType,false)" 
9 = {[email protected]} "StructField(postcode_northing,StringType,false)" 
10 = {[email protected]} "StructField(distance_calc_method,StringType,false)" 
11 = {[email protected]} "StructField(distance,StringType,false)" 
12 = {[email protected]} "StructField(geocoded_address,StringType,false)" 
13 = {[email protected]} "StructField(geocoded_postcode,StringType,false)" 

我在做什麼錯誤?任何幫助大量讚賞!

回答

2

定義變量Dataset<Row> df和移動碼塊,用於讀取CSV文件內getDataframe()方法等下方。

private Dataset<Row> df = null; 

public Dataset getDataframe() { 
    df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("src/main/java/resources/test.csv"); //TODO: CMD line arg 
     return df; 
} 

現在你可以像下面這樣稱呼它。

CsvReader cr = new CsvReader(schema); 
    Dataset df = cr.getDataframe(); 
    cr.printSchema(); 

我建議你重新設計你的班級。一種選擇是你可以將df傳遞給其他方法作爲參數。如果你使用的是Spark 2.0,那麼你不需要SparkConf。請參考documentation創建SparkSession。

+0

好極了!非常感謝! –

0

你應該把你的DF在構造函數中,如果你想通過builder.Or初始化它,你可以把它放在一個成員函數。