2017-08-05 71 views
3

我承擔斯卡拉星火的coursera課程,我試圖優化這個片段與ReduceByKey洗牌:避免在星火

val indexedMeansG = vectors.                                         
     map(v => findClosest(v, means) -> v).                                      
     groupByKey.mapValues(averageVectors) 

vectorsRDD[(Int, Int)],爲了看的依賴列表和RDD的血統我用:

println(s"""GroupBy:                                           
      | Deps: ${indexedMeansG.dependencies.size}                                   
      | Deps: ${indexedMeansG.dependencies}                                     
      | Lineage: ${indexedMeansG.toDebugString}""".stripMargin) 

這都說明這一點:

/* GroupBy:                                             
    * Deps: 1                                              
    * Deps: List([email protected])                                  
    * Lineage: (6) MapPartitionsRDD[18] at mapValues at StackOverflow.scala:207 []                             
    * ShuffledRDD[17] at groupByKey at StackOverflow.scala:207 []                                 
    * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:206 []                                
    * MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                 
    *  CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                         
    * MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                 
    * MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                
    * MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                
    * MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                  
    * MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                  
    * CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                   
    * +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                
    * | MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                
    * | MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                 
    * | src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                   
    * | src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                     
    * +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                
    * MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                 
    * MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                  
    * src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                    
    * src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */ 

從這個List([email protected])我推斷沒有洗牌正在完成,對嗎?但是,低於ShuffledRDD[17]被打印,這意味着實際上有洗牌。

我已經試過了reduceByKey以取代groupByKey呼叫,像這樣:

val indexedMeansR = vectors.                                        
     map(v => findClosest(v, means) -> v).                                     
     reduceByKey((a, b) => (a._1 + b._1)/2 -> (a._2 + b._2)/2) 

和它的依賴和血統是:

/* ReduceBy:                                             
    * Deps: 1                                              
    * Deps: List([email protected])                                  
    * Lineage: (6) ShuffledRDD[17] at reduceByKey at StackOverflow.scala:211 []                              
    * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:210 []                                
    * MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                 
    *  CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                         
    * MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                 
    * MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                
    * MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                
    * MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                  
    * MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                  
    * CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                   
    * +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                
    * | MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                
    * | MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                 
    * | src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                   
    * | src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                     
    * +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                
    * MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                 
    * MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                  
    * src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                    
    * src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */ 

這一次,相關性是ShuffleDependency和我我無法理解爲什麼。

由於RDD是一對鍵是整數,因此具有的排序,我還試圖修改的分割器和使用RangePartitioner,但它並不能改善或者

回答

3

reduceByKey甲操作仍然涉及洗牌,因爲它仍然需要確保具有相同密鑰的所有項目成爲同一分區的一部分。

但是,這將是一個比groupByKey操作小得多的洗牌操作。 A reduceByKey將在混洗之前執行每個分區內的縮小操作,從而減少要混洗的數據量。

+0

但是相應地,對於依賴關係的輸出,groupByKey有一個OneToOneDependency,它不涉及混洗,reduceByKey具有ShuffleDependency,涉及混洗。爲什麼? – elbaulp

+0

'OneToOneDependency'對應於'mapValues'調用,而不是'groupByKey'調用。如果刪除了,你應該注意到'ShuffleDependency'。另外,請注意'groupByKey'血統中的'ShuffledRDD'。 –