2017-06-15 118 views
0

嗨,大家好,我想將RDD [Vector]和RDD [Int]結合到RDD [Vector] 這就是我所做的,我使用Kmeans來預測集羣,想法是添加在前面的每個矢量。這裏我就是這樣做的通訊員簇合併兩種不同類型的RDD

val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate() 
val data = spark.sparkContext.textFile("C:/spark/data/mllib/kmeans_data.txt") 
//Cluster the data into two classes using KMeans 
val numClusters = 2 
val numIterations = 20 
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()//RDD[vector] 
val clusters = KMeans.train(parsedData, numClusters, numIterations) 
val resultatOfprediction=clusters.predict(parsedData)//RDD[int] 
val finalData=parsedData.zip(resultatOfprediction) 
finalData.collect().foreach(println) 

結果是

([0.0,0.0,0.0],0) 
([0.1,0.1,0.1],0) 
([0.2,0.2,0.2],0) 
([9.0,9.0,9.0],1) 
([9.1,9.1,9.1],1) 
([9.2,9.2,9.2],1) 

輸出我想

[0.0,0.0,0.0,1.0] 
    [0.1,0.1,0.1,1.0] 
    [0.2,0.2,0.2,1.0] 
    [9.0,9.0,9.0,0.0] 
    [9.1,9.1,9.1,0.0] 
    [9.2,9.2,9.2,0.0] 

的目標是,我要AA最終RDD [載體]保存到一個txt文件中grid.but您提供的結果狀態並沒有一個RDD [矢量]

回答

2

顯示它要得到你想要的,你需要的結果壓縮這兩個RDD。這裏是你如何做到這一點

val parsedData = spark.sparkContext.parallelize(Seq(1.0,1.0,1.0,0.0,0.0,0.0)) 

val resultatOfprediction = spark.sparkContext.parallelize(Seq(
    (0.0,0.0,0.0), 
    (0.1,0.1,0.1), 
    (0.2,0.2,0.2), 
    (9.0,9.0,9.0), 
    (9.1,9.1,9.1), 
    (9.2,9.2,9.2) 
)) 

resultatOfprediction.zip(parsedData) 

因爲它返回一個元組,你可以得到結果爲

resultatOfprediction.zip(parsedData) 
     .map(t => (t._1._1, t._1._2, t._1._3, t._2)) 

對於動態,你可以做folling爲sugested通過@拉胡爾 - Sukla resultatOfprediction.zip(parsedData) .map(t => t._1.productIterator.toList.map(_.asInstanceOf[Double]) :+ t._2)

希望這有助於!

+0

請檢查更新感謝 –

+0

檢查更新的答案 –

+0

我沒有得到一個正確的答案,請你會提供對方的回答感謝 –