2015-10-16 44 views
0

我有兩個RDD都有兩列(K,V)。在那些RDD的密鑰源中,密鑰顯示爲一個在另一個之下,並且對於每一行,將不同且不同的值分配給密鑰。這篇文章的底部給出了創建RDD的文本文件。是否有可能加入兩個rdds的值以避免昂貴的洗牌?

兩個RDD中的鍵完全不同,我想根據它們的值加入兩個RDD,並嘗試查找每對存在多少個常見值。例如我試圖達到諸如(1-5,10)的結果,這意味着來自RDD1的關鍵值「1」和來自RDD2的關鍵值「5」共享共同的10個值。

我在256 GB RAM和72核心的單臺機器上工作。一個文本文件是500 MB,而另一個是3 MB。

這裏是我的代碼:

val conf = new SparkConf().setAppName("app").setMaster("local[*]").set("spark.shuffle.spill", "true") 
.set("spark.shuffle.memoryFraction", "0.4") 
.set("spark.executor.memory","128g") 
.set("spark.driver.maxResultSize", "0") 

val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(0),s(1))} 

val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))} 


val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap) 

     val joined = emp.mapPartitions(iter => for { 
      (k, v1) <- iter 
      v2 <- emp_newBC.value.getOrElse(v1, Iterable()) 
     } yield (s"$k-$v2", 1)) 

    joined.foreach(println) 

val result = joined.reduceByKey((a,b) => a+b) 

我嘗試通過從腳本看到廣播變量來管理這個問題。如果我加入RDD2(擁有250K行)與自身對在同一個分區中顯示,那麼發生較少的洗牌,因此需要3分鐘才能獲得結果。但是,在應用RDD1與RDD2時,對會通過分區分散,導致非常昂貴的洗牌過程,並且總是給出

錯誤TaskSchedulerImpl:localhost上丟失的executor驅動程序:執行程序心跳在168591 ms錯誤後超時。

根據我的結果:

  • 我應該嘗試分區文本文件中較小的塊 創建RDD1集,並分別加入那些更小的塊與RDD2?

  • 是否有另一種方法基於它們的值字段連接兩個RDD?如果我將原始值描述爲鍵並將其與連接函數結合起來,則值對再次分散在分區上,這又導致非常昂貴的減鍵操作。例如

    val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))} 
    
        val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))} 
    

    RDD1.join(RDD2).MAP(線=>(line._2,1))。reduceByKey((A,B)=>(A + B))

僞數據樣本:

KEY VALUE 
1 13894 
1 17376 
1 15688 
1 22434 
1 2282 
1 14970 
1 11549 
1 26027 
1 2895 
1 15052 
1 20815 
2 9782 
2 3393 
2 11783 
2 22737 
2 12102 
2 10947 
2 24343 
2 28620 
2 2486 
2 249 
2 3271 
2 30963 
2 30532 
2 2895 
2 13894 
2 874 
2 2021 
3 6720 
3 3402 
3 25894 
3 1290 
3 21395 
3 21137 
3 18739 
... 

一個小例子

RDD1集

2 1 
2 2 
2 3 
2 4 
2 5 
2 6 
3 1 
3 6 
3 7 
3 8 
3 9 
4 3 
4 4 
4 5 
4 6 

RDD2

21 1 
21 2 
21 5 
21 11 
21 12 
21 10 
22 7 
22 8 
22 13 
22 9 
22 11 

基於此數據JOIN結果:

(3-22,1) 
(2-21,1) 
(3-22,1) 
(2-21,1) 
(3-22,1) 
(4-21,1) 
(2-21,1) 
(3-21,1) 
(3-22,1) 
(3-22,1) 
(2-21,1) 
(3-22,1) 
(2-21,1) 
(4-21,1) 
(2-21,1) 
(3-21,1) 

REDUCEBYKEY結果:

(4-21,1) 
(3-21,1) 
(2-21,3) 
(3-22,3) 
+0

你應該展示一個使用nano-data的例子,告訴我spark-default.conf的內容是什麼 –

+0

我剛編輯過這個問題。 –

+0

我在哪裏工作我們有4臺8GB的計算機,並且我們能夠讀取4GB +文件而沒有任何問題,所以我打賭你的$ SPARK_HOME/conf/spark-defaults.conf中存在問題,再加上你沒有任何問題, t添加我用IN納米數據請求的示例 - > OUT納米數據(使其更清晰) –

回答

-1

你看着用笛卡爾加入?也許你可以嘗試像下面:

val rdd1 = sc.parallelize(for { x <- 1 to 3; y <- 1 to 5 } yield (x, y)) // sample RDD 
val rdd2 = sc.parallelize(for { x <- 1 to 3; y <- 3 to 7 } yield (x, y)) // sample RDD with slightly displaced values from the first 

val g1 = rdd1.groupByKey() 
val g2 = rdd2.groupByKey() 

val cart = g1.cartesian(g2).map { case ((key1, values1), (key2, values2)) => 
      ((key1, key2), (values1.toSet & values2.toSet).size) 
      } 

當我嘗試運行在羣集上面的例子中,我看到以下內容:

scala> rdd1.take(5).foreach(println) 
... 
(1,1) 
(1,2) 
(1,3) 
(1,4) 
(1,5) 
scala> rdd2.take(5).foreach(println) 
... 
(1,3) 
(1,4) 
(1,5) 
(1,6) 
(1,7) 
scala> cart.take(5).foreach(println) 
... 
((1,1),3) 
((1,2),3) 
((1,3),3) 
((2,1),3) 
((2,2),3) 

結果表明,對(密鑰1,密鑰2),有在這些集合之間是3個匹配元素。請注意,由於初始化的輸入元組的範圍與3個元素重疊,所以結果始終爲3。

笛卡爾轉換不會導致洗牌,因爲它只是遍歷每個RDD的元素並生成笛卡爾積。您可以通過在示例中調用toDebugString()函數來查看。

scala> val carts = rdd1.cartesian(rdd2) 
carts: org.apache.spark.rdd.RDD[((Int, Int), (Int, Int))] = CartesianRDD[9] at cartesian at <console>:25 

scala> carts.toDebugString 
res11: String = 
(64) CartesianRDD[9] at cartesian at <console>:25 [] 
| ParallelCollectionRDD[1] at parallelize at <console>:21 [] 
| ParallelCollectionRDD[2] at parallelize at <console>:21 [] 
+0

對不起,但它只是使糟糕的情況變得更糟。 – zero323

+0

@ zero323是的,是的,我沒有考慮內存的後果。如果較小的數據集有250K條記錄並且數據集大致相似,那麼較大的數據集就有大約41M條記錄。完整的笛卡爾大小約爲10TB,這是非常不可行的。 –

+0

它不是唯一的。即使只考慮網絡流量,「笛卡兒」也會更糟糕。一對一加入地圖,但笛卡爾產品地圖一對一。這意味着每個分區都必須多次移動。 – zero323