2016-09-19 57 views
0

我有以下代碼:未能添加鍵映射並聯

var res: GenMap[Point, GenSeq[Point]] = points.par.groupBy(point => findClosest(point, means)) 
    means.par.foreach(mean => if(!res.contains(mean)) { 
    println("Map doesn't contain mean: " + mean) 
    res += mean -> GenSeq.empty[Point] 
    println("Map contains?: " + res.contains(mean)) 
    }) 

使用這種情況下類:

case class Point(val x: Double, val y: Double, val z: Double) 

基本上,代碼組中pointsPoint元件Point周圍元素在means。算法本身雖然不是很重要。

我的問題是,我得到了以下的輸出:

Map doesn't contain mean: (0.44, 0.59, 0.73) 
Map doesn't contain mean: (0.44, 0.59, 0.73) 
Map doesn't contain mean: (0.1, 0.11, 0.11) 
Map doesn't contain mean: (0.1, 0.11, 0.11) 
Map contains?: true 
Map contains?: true 
Map contains?: false 
Map contains?: true 

爲什麼我會永遠得到這個?

Map contains?: false 

我正在檢查密鑰是否在res地圖中。如果不是,那我就加入它。 那麼它如何不能出現在地圖中呢?

是否存在並行性問題?

+0

這個問題是否發生在沒有使用並行化的情況下? – Samar

回答

2

你的代碼中有一個符合競爭條件

res += mean -> GenSeq.empty[Point] 

多個線程reasigning資源同時使某些條目,可能會錯過。

此代碼解決了這個問題:

val closest = points.par.groupBy(point => findClosest(point, means)) 
val res = means.foldLeft(closest) { 
    case (map, mean) => 
    if(map.contains(mean)) 
     map 
    else 
     map + (mean -> GenSeq.empty[Point]) 
} 
+0

這工作,看起來優雅,但它不平行。 – octavian

+0

你是對的,第二部分不是並行執行。我認爲計算量大的部分是findClosest()函數的執行。這是一個簡單的解決方案,應該在並行性方面非常高效(儘管可以改進)。我認爲,如果'意思'不是很大,那麼它會像使用並行解決方案一樣高效(如果不是更好),那麼你必須認爲在創建並行程序時總會有一些開銷,有時它不值得並行化執行的某些部分 – Mikel

0

處理的點改變裝置和結果是處理順序敏感,所以該算法不適於本身並行執行。如果並行執行足夠重要以允許改變算法,那麼可以找到可以並行應用的算法。

使用一組已知的分組點,如格正方形中心的,意味着該點可以被分配給在平行其分組分和並聯可以通過分組點分組:

import scala.annotation.tailrec 
import scala.collection.parallel.ParMap 
import scala.collection.{GenMap, GenSeq, Map} 
import scala.math._ 
import scala.util.Random 

class ParallelPoint { 
    val rng = new Random(0) 

    val groups: Map[Point, Point] = (for { 
       i <- 0 to 100 
       j <- 0 to 100 
       k <- 0 to 100 
       } 
       yield { 
       val p = Point(10.0 * i, 10.0 * j, 10.0 * k) 
       p -> p 
       } 
    ).toMap 

    val points: Array[Point] = (1 to 10000000).map(aaa => Point(rng.nextDouble() * 1000.0, rng.nextDouble() * 1000.0, rng.nextDouble() * 1000.0)).toArray 

    def findClosest(point: Point, groups: GenMap[Point, Point]): (Point, Point) = { 
    val x: Double = rint(point.x/10.0) * 10.0 
    val y: Double = rint(point.y/10.0) * 10.0 
    val z: Double = rint(point.z/10.0) * 10.0 

    val mean: Point = groups(Point(x, y, z)) //.getOrElse(throw new Exception(s"$point out of range of mean ($x, $y, $z).")) 

    (mean, point) 
    } 

    @tailrec 
    private def total(points: GenSeq[Point]): Option[Point] = { 
    points.size match { 
     case 0 => None 
     case 1 => Some(points(0)) 
     case _ => total((points(0) + points(1)) +: points.drop(2)) 
    } 
    } 

    def mean(points: GenSeq[Point]): Option[Point] = { 
    total(points) match { 
     case None => None 
     case Some(p) => Some(p/points.size) 
    } 
    } 

    val startTime = System.currentTimeMillis() 

    println("starting test ...") 

    val res: ParMap[Point, GenSeq[Point]] = points.par.map(p => findClosest(p, groups)).groupBy(pp => pp._1).map(kv => kv._1 -> kv._2.map(v => v._2)) 

    val groupTime = System.currentTimeMillis() 
    println(s"... grouped result after ${groupTime - startTime}ms ...") 

    points.par.foreach(p => if (! res(findClosest(p, groups)._1).exists(_ == p)) println(s"point $p not found")) 

    val checkTime = System.currentTimeMillis() 

    println(s"... checked grouped result after ${checkTime - startTime}ms ...") 

    val means: ParMap[Point, GenSeq[Point]] = res.map{ kv => mean(kv._2).get -> kv._2 } 

    val meansTime = System.currentTimeMillis() 

    println(s"... means calculated after ${meansTime - startTime}ms.") 
} 

object ParallelPoint { 
    def main(args: Array[String]): Unit = new ParallelPoint() 
} 

case class Point(x: Double, y: Double, z: Double) { 
    def +(that: Point): Point = { 
     Point(this.x + that.x, this.y + that.y, this.z + that.z) 
    } 

    def /(scale: Double): Point = Point(x/ scale, y/scale, z/scale) 
} 

最後一步用分組點的計算平均值替換分組點作爲映射鍵。這在我的2011 MBP中約30秒內處理1000萬個點。