2017-08-28 108 views
1

雖然星火運行下面的代碼的Java(local)的情況下,我得到的錯誤:火花的java:java.lang.IllegalArgumentException異常:對象不是聲明

at Datahub.run(Datahub.java:96) 
    at Datahub.main(Datahub.java:64) 
***Caused by: java.lang.IllegalArgumentException: object is not an instance of declaring class*** 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

邏輯讀取CSV和保存它以鑲木地板格式。

public class Datahub implements Serializable{ 

    @SuppressWarnings("serial") 

    private transient SparkConf sparConf; 
    private transient JavaSparkContext sparkContext; 
    private transient SQLContext SQLContext; 

    public Datahub(){ 
     sparConf = new SparkConf().setAppName("Datahub").setMaster("local"); 
     sparkContext = new JavaSparkContext(sparConf); 
     SQLContext = new SQLContext(sparkContext); 

    System.setProperty("hadoop.home.dir", "C:/tools/spark"); 

    } 

    public static void main(String[] args) throws Exception { 
     Datahub job = new Datahub(); 
     job.run("a","b"); 
    } 

    public void run(String t, String u) 
    { 

     JavaRDD<String> pairRDD = sparkContext.textFile("C:/temp/L1_result.csv"); 
     JavaPairRDD<String,String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, String, String>() { 


     public Tuple2<String,String> call(String rec) { 

      String[] tokens = rec.split(";"); 
      String[] vals = new String[tokens.length]; 
      for(int i= 0; i < tokens.length; i++){ 
         vals[i] =tokens[i]; 
     } 
       return new Tuple2<String, String>(tokens[0], tokens[1]); 
     } }); 

     Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class); 
     fundDF.printSchema(); 
     fundDF.show(); 
     fundDF.write().option("mergeschema", true).parquet("C:/test"); 
    } 
} 
+0

在這方面,有人幫助?它只是我試圖發送的兩列作爲輸入。不知道發生了什麼問題 – Ans8

回答

0

解決了與以下變化:

funds b0 = new funds(); b0.setK("k0"); b0.setSomething("sth0"); 
funds b1 = new funds(); b1.setK("k1"); b1.setSomething("sth1"); 
List<funds> data = new ArrayList<funds>(); 
data.add(b0); data.add(b1); 
Dataset<Row> fundDf = SQLContext.createDataFrame(data, funds.class); 
fundDf.printSchema(); 
fundDf.write().option("mergeschema", true).parquet("C:/test"); 
0

您可以讀取該文件,並將其如下轉換爲bean類資金

Dataset<funds> fundsDF = SQLContext.read().csv("C:/temp/L1_result.csv").as(Encoders.bean(funds.class));