2015-04-17 143 views
1

我是新來的火花,不明白mapreduce機制如何與火花工作。我有一個只有雙打的csv文件,我想要的是用第一個矢量與rdd的其餘部分進行操作(計算euclidian距離)。然後迭代其他向量。它是否以另一種方式存在?也許明智地使用笛卡爾產品...如何避免使用Spark循環?

val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),(2,Vectors.dense(3,4),...))) 
val array_vects = rdd.collect 
val size = rdd.count 
val emptyArray = Array((0,Vectors.dense(0))).tail 
var rdd_rez = sc.parallelize(emptyArray) 

for(ind <- 0 to size -1) { 
    val vector = array_vects(ind)._2 
    val rest = rdd.filter(x => x._1 != ind) 
    val rdd_dist = rest.map(x => (x._1 , Vectors.sqdist(x._2,vector))) 
    rdd_rez = rdd_rez ++ rdd_dist 
} 

感謝您的支持。

回答

4

(所有向量對之間)的距離可以用rdd.cartesian計算:

val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)), 
           (2,Vectors.dense(3,4)),...)) 
val product = rdd.cartesian(rdd) 

val result = product.filter{ case ((a, b), (c, d)) => a != c } 
        .map { case ((a, b), (c, d)) => 
            (a, Vectors.sqdist(b, d)) } 
0

我不認爲你爲什麼要這樣做。你可以簡單地做到這一點如下。

val initialArray = Array((1,Vectors.dense(1,2)), (2,Vectors.dense(3,4)),...) 

val firstVector = initialArray(0) 

val initialRdd = sc.parallelize(initialArray) 

val euclideanRdd = initialRdd.map({ case (i, vec) => (i, euclidean(firstVector, vec)) }) 

我們定義一個函數euclidean這需要兩個密集的載體,並返回歐幾里得距離。

+0

喔......他只想要第一個向量... OMG ......編輯答案... –

+0

無,「!hen與其他向量迭代」。那是在做了與所有其他人的第一個向量的歐幾里德距離之後) –

+0

喲意味着什麼....你是說你想要每個向量與其他向量的歐氏距離? –