2017-08-30 86 views
0

我編寫了讀取csv文件並將所有列映射到bean類的代碼。 現在,我試圖將這些值設置爲一個數據集並得到一個問題。如何將csv映射的bean類傳遞給數據集

7/08/30 16:33:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: object is not an instance of declaring class 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

如果我嘗試設置的值手動正常工作

public void run(String t, String u) throws FileNotFoundException { 

    JavaRDD<String> pairRDD = sparkContext.textFile("C:/temp/L1_result.csv"); 
    JavaPairRDD<String,String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, String, String>() { 

     public Tuple2<String,String> call(String rec) throws FileNotFoundException { 
      String[] tokens = rec.split(";"); 
      String[] vals = new String[tokens.length]; 
      for(int i= 0; i < tokens.length; i++){ 
       vals[i] =tokens[i]; 
      } 

      return new Tuple2<String, String>(tokens[0], tokens[1]); 
     } 
    }); 


    ColumnPositionMappingStrategy cpm = new ColumnPositionMappingStrategy(); 
    cpm.setType(funds.class); 
    String[] csvcolumns = new String[]{"portfolio_id", "portfolio_code"}; 
    cpm.setColumnMapping(csvcolumns); 

    CSVReader csvReader = new CSVReader(new FileReader("C:/temp/L1_result.csv")); 

    CsvToBean csvtobean = new CsvToBean(); 
    List csvDataList = csvtobean.parse(cpm, csvReader); 

    for (Object dataobject : csvDataList) { 
     funds fund = (funds) dataobject; 
     System.out.println("Portfolio:"+fund.getPortfolio_id()+ " code:"+fund.getPortfolio_code()); 
    } 

    /* funds b0 = new funds(); 
    b0.setK("k0"); 
    b0.setSomething("sth0"); 
    funds b1 = new funds(); 
    b1.setK("k1"); 
    b1.setSomething("sth1"); 
    List<funds> data = new ArrayList<funds>(); 
    data.add(b0); 
    data.add(b1);*/ 

    System.out.println("Portfolio:" + rowJavaRDD.values()); 


    //manual set works fine /// 
    // Dataset<Row> fundDf = SQLContext.createDataFrame(data, funds.class); 
    Dataset<Row> fundDf = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class); 
    fundDf.printSchema(); 
    fundDf.write().option("mergeschema", true).parquet("C:/test"); 
} 

的線下是給一個問題:使用rowJavaRDD.values()

Dataset<Row> fundDf = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class); 

什麼決議嗎?任何值的列映射應該在這裏傳遞,但是這需要如何完成。任何想法真的幫助我。

回答

0

Dataset fundDf = SQLContext.createDataFrame(csvDataList,funds.class);

通過名單工作!