2016-09-29 95 views
0

我有5個洗牌的鍵值rdds,一個大的(1,000,000條記錄)和4個相對較小的(100,000條記錄)。所有的rdds都有相同數量的分區,我有兩個策略來合併5個分區,spark:如何有效地合併混洗rdd?

  1. 合併5個RDDS一起
  2. 合併了4個個小RDDS在一起,然後加入bigone

我認爲策略2會更有效,因爲它不會重新洗牌大一號。但實驗結果顯示策略1效率更高。該代碼和輸出如下:

代碼

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkContext, SparkConf} 


object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val bigRddSize = 1e6.toInt 
    val smallRddSize = 1e5.toInt 
    println(bigRddSize) 

    val bigRdd = sc.parallelize((0 until bigRddSize) 
     .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
    bigRdd.take(10).foreach(println) 

    val smallRddList = (0 until 4).map(i => { 
     val rst = sc.parallelize((0 until smallRddSize) 
      .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
     println(rst.count) 
     rst 
    }).toArray 

    // strategy 1 
    { 
     val begin = System.currentTimeMillis 

     val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100) 
     println(s1Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S1 time count: %.1f s".format(timeCost)) 
    } 

    // strategy 2 
    { 
     val begin = System.currentTimeMillis 

     val smallMerged = sc.union(smallRddList).distinct(100).cache 
     println(smallMerged.count) 

     val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => { 
      if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct 
      else if (left.isDefined) Array((key, left.get)) 
      else if (right.isDefined) Array((key, right.get)) 
      else throw new Exception("Cannot happen") 
     } 
     }) 
     println(s2Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S2 time count: %.1f s".format(timeCost)) 
    } 

} 

輸出

1000000 
(688282474,0) 
(-255073127,0) 
(872746474,0) 
(-792516900,0) 
(417252803,0) 
(-1514224305,0) 
(1586932811,0) 
(1400718248,0) 
(939155130,0) 
(1475156418,0) 
100000 
100000 
100000 
100000 
1399777 
S1 time count: 39.7 s 
399984 
1399894 
S2 time count: 49.8 s 

我的洗牌RDD理解錯了?有人可以提供一些建議嗎? 謝謝!

+0

當您在策略2中進行連接時,您沒有在策略1(僅聯合)中進行任何聯接。爲什麼?請記住,union不需要對數據進行混洗 - 它可以將每個執行器上存在的RDD拼湊起來。更具體地說,Union只創建一個狹義的依賴關係,而join則創建一個shuffle依賴關係。所以看起來策略1和2是蘋果和桔子。 –

+0

@SachinTyagi我的目標是區分5 rdds,策略1和2最後都不同。不同的將洗牌數據。由於大rdd已經洗牌,因此策略2不會洗牌大牌,應該更有效率,但實驗顯示相反。 – bourneli

+0

我不太清楚我的理解,但是無論何時加入,您都會引入一個隨機播放的依賴關係,從而最終(重新)洗牌數據。不管你的rdd早些時候是否洗牌。這與你所看到的一致。 –

回答

0

我發現了一個方法,以更有效地合併RDD,請參閱下列2種合併策略:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf} 
import scala.collection.mutable.ArrayBuffer 

object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val rddCount = 20 
    val mergeCount = 5 
    val dataSize = 20000 
    val parts = 50 

    // generate data 
    scala.util.Random.setSeed(943343) 
    val testData = for (i <- 0 until rddCount) 
     yield sc.parallelize(scala.util.Random.shuffle((0 until dataSize).toList).map(x => (x, 0))) 
      .partitionBy(new HashPartitioner(parts)) 
      .cache 
    testData.foreach(x => println(x.count)) 

    // strategy 1: merge directly 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct 
        .partitionBy(new HashPartitioner(parts)).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 1 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 1 Complete, with merged Count %s".format(buff(0).count)) 
    } 


    // strategy 2: merge directly without repartition 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct(parts).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 2 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 2 Complete, with merged Count %s".format(buff(0).count)) 
    } 

} 

結果表明,策略1(時間成本20.8秒)比策略2更有效的(時間花費34.3秒)。我的電腦是8號窗口,CPU 4核2.0GHz,8GB內存。

唯一的區別是由HashPartitioner分區的策略,但策略2不是。結果,策略1產生ShuffledRDD,但策略1 MapPartitionsRDD。我認爲RDD.distinct功能比MapPartitionsRDD更有效地處理ShuflledRDD。