2016-09-14 91 views
0

的元件的斯帕塞夫克託,我val rdd = RDD[(x: Int, y:Int), cov:Double]類型的Scala中,其中RDD的每個元素表示與x表示行,y表示列和cov表示矩陣的元素的數據結構元素的值:創建從使用火花的RDD

我需要從這個矩陣的行創建SparseVectors。所以我決定先RDD轉換爲RDD[x: Int, (y:Int, cov:Double)],然後用groupByKey把特定行的所有元素結合在一起是這樣的:

val rdd2 = rdd.map{case ((x,y),cov) => (x, (y, cov))}.groupByKey()

現在我需要創建SparseVectors:

val N = 7  //Vector Size 
val spvec = {(x: Int,y: Iterable[(Int, Double)]) => new SparseVector(N.toLong, Array(y.map(el => el._1.toInt)), Array(y.map(el => el._2.toDouble)))} 
val vecs = rdd2.map(spvec) 

但是,這是彈出的錯誤。

type mismatch; found :Iterable[Int] required:Int 
type mismatch; found :Iterable[Double] required:Double 

我猜測y.map(el => el._1.toInt)返回一個數組無法應用的迭代。我很感激,如果有人可以幫助如何做到這一點。

回答

0

最簡單的解決方法是將轉換爲RowMatrix

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 

val rdd: RDD[((Int, Int), Double)] = ??? 

val vs: RDD[org.apache.spark.mllib.linalg.SparseVector]= new CoordinateMatrix(
    rdd.map{ 
    case ((x, y), cov) => MatrixEntry(x, y, cov) 
    } 
).toRowMatrix.rows.map(_.toSparse) 

如果您想保留行標號可以使用toIndexedRowMatrix代替:

import org.apache.spark.mllib.linalg.distributed.IndexedRow 

new CoordinateMatrix(
    rdd.map{ 
    case ((x, y), cov) => MatrixEntry(x, y, cov) 
    } 
).toIndexedRowMatrix.rows.map { case IndexedRow(i, vs) => (i, vs.toSparse) } 
+0

謝謝。它適用於'toRowMatrix',但不適用於'toIndexedRowMatrix',表示'value toSparse不是org.apache.spark.mllib.linalg.distributed.IndexedRow'的成員。我確實想保留行索引。 – EdgeRover

+0

因爲它包含'IndexedRows'而不是'Vectors'。 – zero323