2015-10-05 111 views
0

如何將csv轉換爲Rdd [雙]?我有錯誤:無法在該行被應用到(org.apache.spark.rdd.RDD [單位]):將Rdd [矢量]轉換爲Rdd [雙]

val kd = new KernelDensity().setSample(rows) 

我完整的代碼是在這裏:

import org.apache.spark.mllib.linalg.Vectors 
    import org.apache.spark.mllib.linalg.distributed.RowMatrix 
    import org.apache.spark.mllib.stat.KernelDensity 
    import org.apache.spark.rdd.RDD 
    import org.apache.spark.{SparkContext, SparkConf} 

class KdeAnalysis { 
    val conf = new SparkConf().setAppName("sample").setMaster("local") 
    val sc = new SparkContext(conf) 

    val DATAFILE: String = "C:\\Users\\ajohn\\Desktop\\spark_R\\data\\mass_cytometry\\mass.csv" 
    val rows = sc.textFile(DATAFILE).map { 
    line => val values = line.split(',').map(_.toDouble) 
     Vectors.dense(values) 
    }.cache() 



    // Construct the density estimator with the sample data and a standard deviation for the Gaussian 
    // kernels 
    val rdd : RDD[Double] = sc.parallelize(rows) 
    val kd = new KernelDensity().setSample(rdd) 
    .setBandwidth(3.0) 

    // Find density estimates for the given values 
    val densities = kd.estimate(Array(-1.0, 2.0, 5.0)) 
} 
+0

我沒有看到你在那裏你可以得到'RDD [單位]任何地方'。 – zero323

回答

2

由於rows是一個RDD[org.apache.spark.mllib.linalg.Vector]以下線路不能正常工作:

val rdd : RDD[Double] = sc.parallelize(rows) 

parallelize預計Seq[T]RDD不是Seq

即使這部分工作正如你所期望的那樣,你的輸入是完全錯誤的。 KernelDensity.setSample的正確參數是RDD[Double]JavaRDD[java.lang.Double]。看起來它現在不支持多元數據。

關於從瓦的問題,你可以flatMap

rows.flatMap(_.toArray) 

,甚至更好,當你創建rows

val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache() 

,但我懷疑這真的是你所需要的。

0

準備了此代碼,請評價,如果它可以幫你 - >

val doubleRDD = rows.map(_.toArray).flatMap(x => x)