2015-07-21 52 views
0

我有這段代碼,它在獨立工作時很好,但在AWS上的4個從屬集羣(8個內存30Go內存)上工作速度很慢。集羣上的Spark作業比單獨的更慢

For a file of 10000 entries 
Standalone : 257s 
Aws 4S : 369s 

    def tabHash(nb:Int, dim:Int) = { 

     var tabHash0 = Array(Array(0.0)).tail 

     for(ind <- 0 to nb-1) { 
      var vechash1 = Array(0.0).tail 
      for(ind <- 0 to dim-1) { 
       val nG = Random.nextGaussian 
       vechash1 = vechash1 :+ nG 
      } 
      tabHash0 = tabHash0 :+ vechash1 
     } 
     tabHash0 
    } 

    def hashmin3(x:Vector, w:Double, b:Double, tabHash1:Array[Array[Double]]) = { 

     var tabHash0 = Array(0.0).tail 
     val x1 = x.toArray 
     for(ind <- 0 to tabHash1.size-1) { 
      var sum = 0.0 
      for(ind2 <- 0 to x1.size-1) { 
       sum = sum + (x1(ind2)*tabHash1(ind)(ind2)) 
      }   
      tabHash0 = tabHash0 :+ (sum+b)/w 
     } 
     tabHash0 

    } 

    def pow2(tab1:Array[Double], tab2:Array[Double]) = { 

     var sum = 0.0 
     for(ind <- 0 to tab1.size-1) { 
      sum = sum - Math.pow(tab1(ind)-tab2(ind),2) 
     } 
     sum 
    } 


     val w = ww 
     val b = Random.nextDouble * w 
     val tabHash2 = tabHash(nbseg,dim) 

     var rdd_0 = parsedData.map(x => (x.get_id,(x.get_vector,hashmin3(x.get_vector,w,b,tabHash2)))).cache 

     var rdd_Yet = rdd_0 

     for(ind <- 1 to maxIterForYstar ) { 

      var rdd_dist = rdd_Yet.cartesian(rdd_0).flatMap{ case (x,y) => Some((x._1,(y._2._1,pow2(x._2._2,y._2._2))))}//.coalesce(64) 

      var rdd_knn = rdd_dist.topByKey(k)(Ordering[(Double)].on(x=>x._2)) 

      var rdd_bary = rdd_knn.map(x=> (x._1,Vectors.dense(bary(x._2,k)))) 

      rdd_Yet = rdd_bary.map(x=>(x._1,(x._2,hashmin3(x._2,w,b,tabHash2)))) 


     } 

我試圖播放一些變量

 val w = sc.broadcast(ww.toDouble) 
     val b = sc.broadcast(Random.nextDouble * ww) 
     val tabHash2 = sc.broadcast(tabHash(nbseg,dim)) 

沒有任何影響

我知道這不是巴里功能,因爲我想這個代碼的另一個版本,而不hashmin3與4個奴隸工作正常更糟糕的是8個奴隸是另一個話題。

+0

「對於10000個條目的文件」這是一個小數據集。分配它的開銷可能大於parallleliism的儲蓄 –

+0

我將自己的數據集製作成笛卡爾積,因此認爲我有足夠的數據。 – KyBe

回答

1

錯誤代碼。特別適用於分佈式和大型計算。我不能快速說出問題的根源,但無論如何你必須重寫這段代碼。

  1. 數組對於通用和可共享的數據來說是很糟糕的。它是可變的,需要連續的內存分配(即使你擁有足夠的內存,最後可能會出現問題)。更好地使用Vector(或有時列出)。千萬不要使用數組。
  2. var vechash1 = Array(0.0).tail您使用一個元素創建集合,然後調用函數來獲取空集合。如果它很少,不用擔心表演,但它很醜! var vechash1: Array[Double] = Array()var vechash1: Vector[Double] = Vector()var vechash1 = Vector.empty[Double]
  3. def tabHash(nb:Int, dim:Int) =當它不清楚時總是設置函數的返回類型。斯卡拉的力量是豐富的類型系統。編譯時間檢查非常有用(關於你究竟得到的結果,而不是你想象的結果!)。處理大量數據時非常重要,因爲編譯檢查會節省您的時間和金錢。稍後閱讀這些代碼也更容易。 def tabHash(nb:Int, dim:Int): Vector[Vector[Double]] =
  4. def hashmin3(x: Vector,錯字?它不能編譯沒有類型參數。

第一功能更加緊湊:

def tabHash(nb:Int, dim:Int): Vector[Vector[Double]] = { 
    (0 to nb-1).map {_ => 
    (0 to dim - 1).map(_ => Random.nextGaussian()).toVector 
    }.toVector 
} 

第二個功能是((x*M) + scalar_b)/scalar_w。使用專門針對矩陣工作而優化的庫可能更有效率。

三(我在這裏想錯與計算的跡象,如果算上方誤差):

def pow2(tab1:Vector[Double], tab2:Vector[Double]): Double = 
     tab1.zip(tab2).map{case (t1,t2) => Math.pow(t1 - t2, 2)}.reduce(_ - _) 

var rdd_Yet = rdd_0 Cached RDD is rewrited in cycle. So it's useless storage. 

最後一個循環是很難分析。我認爲它必須簡化。