0
我有5個洗牌的鍵值rdds,一個大的(1,000,000條記錄)和4個相對較小的(100,000條記錄)。所有的rdds都有相同數量的分區,我有兩個策略來合併5個分區,spark:如何有效地合併混洗rdd?
- 合併5個RDDS一起
- 合併了4個個小RDDS在一起,然後加入bigone
我認爲策略2會更有效,因爲它不會重新洗牌大一號。但實驗結果顯示策略1效率更高。該代碼和輸出如下:
代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object MergeStrategy extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val bigRddSize = 1e6.toInt
val smallRddSize = 1e5.toInt
println(bigRddSize)
val bigRdd = sc.parallelize((0 until bigRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
bigRdd.take(10).foreach(println)
val smallRddList = (0 until 4).map(i => {
val rst = sc.parallelize((0 until smallRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
println(rst.count)
rst
}).toArray
// strategy 1
{
val begin = System.currentTimeMillis
val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100)
println(s1Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S1 time count: %.1f s".format(timeCost))
}
// strategy 2
{
val begin = System.currentTimeMillis
val smallMerged = sc.union(smallRddList).distinct(100).cache
println(smallMerged.count)
val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => {
if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct
else if (left.isDefined) Array((key, left.get))
else if (right.isDefined) Array((key, right.get))
else throw new Exception("Cannot happen")
}
})
println(s2Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S2 time count: %.1f s".format(timeCost))
}
}
輸出
1000000
(688282474,0)
(-255073127,0)
(872746474,0)
(-792516900,0)
(417252803,0)
(-1514224305,0)
(1586932811,0)
(1400718248,0)
(939155130,0)
(1475156418,0)
100000
100000
100000
100000
1399777
S1 time count: 39.7 s
399984
1399894
S2 time count: 49.8 s
我的洗牌RDD理解錯了?有人可以提供一些建議嗎? 謝謝!
當您在策略2中進行連接時,您沒有在策略1(僅聯合)中進行任何聯接。爲什麼?請記住,union不需要對數據進行混洗 - 它可以將每個執行器上存在的RDD拼湊起來。更具體地說,Union只創建一個狹義的依賴關係,而join則創建一個shuffle依賴關係。所以看起來策略1和2是蘋果和桔子。 –
@SachinTyagi我的目標是區分5 rdds,策略1和2最後都不同。不同的將洗牌數據。由於大rdd已經洗牌,因此策略2不會洗牌大牌,應該更有效率,但實驗顯示相反。 – bourneli
我不太清楚我的理解,但是無論何時加入,您都會引入一個隨機播放的依賴關係,從而最終(重新)洗牌數據。不管你的rdd早些時候是否洗牌。這與你所看到的一致。 –