2

這是來自here的後續問題。我正在嘗試基於這個implementation實現k-means。它很好,,但我想取代groupByKey()reduceByKey(),但我不知道如何(我不擔心現在的表現)。下面是相關縮小的代碼:用reduceByKey替換groupByKey()

val data = sc.textFile("dense.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache() 

val read_mean_centroids = sc.textFile("centroids.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))) 
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2) 
do { 
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2)) 
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :) 
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap() 
    .. 

注意println(newCentroids)會給:

地圖(23 - >(-6.269305E-4,-0.0011746404,-4.08004E-5),8 - >(-5.108732E-4,7.336348E-4,-3.707591E-4),17 - >(-0.0016383086,-0.0016974678,1.45 ..

println(closest)和:

MapPartitionsRDD [6] at map at kmeans.scala:75

相關問題:Using reduceByKey in Apache Spark (Scala)


一些documentation

DEF reduceByKey(FUNC:(V,V)⇒V):RDD [(K,V)]

合併的值用於每個鍵使用關聯減少函數。

DEF reduceByKey(FUNC:(V,V)⇒V,numPartitions:智力):RDD [(K,V)]

合併使用的締合減少函數中的每個鍵的值。

DEF reduceByKey(分割器:分區程序,FUNC:(V,V)⇒V):RDD [(K,V)]

使用關聯減少功能合併用於每個鍵的值。

DEF groupByKey():RDD [(K,可迭代[V])]

組在RDD每個鍵成一個單一的序列值。

回答

5

你可以使用一個aggregateByKey()(略高於reduceByKey()更自然)這樣來計算newCentroids

val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
    (agg, v) => (agg._1 += v, agg._2 + 1L), 
    (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2) 
).mapValues(agg => agg._1/agg._2).collectAsMap 

對於這個工作,你需要計算你的數據,即dim的維度,但你只需要做一次。你可以使用類似val dim = data.first._2.length的東西。

+0

工作就像一個魅力!你能解釋我們在這裏做了什麼嗎?我的意思是爲什麼我想用reduceByKey()替換groupByKey()?這樣做的主要優點是什麼?相關:http://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work – gsamaras

+1

好吧,'groupByKey'會導致一堆東西被髮送到各個節點之間即所有與給定鍵相關的值,用於所有鍵和數據的部分。另一方面,使用'aggregateByKey'方法,每個部分只負責向(向駕駛員)傳送由總和和計數組成的對。這麼少的網絡通信以及無需創建所有這些值的集合(因爲它只是它們的總和和數量在計算平均值時很重要)。 –

+0

好吧,這就是我的想法,非常感謝! – gsamaras