我有兩個RDD都有兩列(K,V)。在那些RDD的密鑰源中,密鑰顯示爲一個在另一個之下,並且對於每一行,將不同且不同的值分配給密鑰。這篇文章的底部給出了創建RDD的文本文件。是否有可能加入兩個rdds的值以避免昂貴的洗牌?
兩個RDD中的鍵完全不同,我想根據它們的值加入兩個RDD,並嘗試查找每對存在多少個常見值。例如我試圖達到諸如(1-5,10)的結果,這意味着來自RDD1的關鍵值「1」和來自RDD2的關鍵值「5」共享共同的10個值。
我在256 GB RAM和72核心的單臺機器上工作。一個文本文件是500 MB,而另一個是3 MB。
這裏是我的代碼:
val conf = new SparkConf().setAppName("app").setMaster("local[*]").set("spark.shuffle.spill", "true")
.set("spark.shuffle.memoryFraction", "0.4")
.set("spark.executor.memory","128g")
.set("spark.driver.maxResultSize", "0")
val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(0),s(1))}
val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}
val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)
val joined = emp.mapPartitions(iter => for {
(k, v1) <- iter
v2 <- emp_newBC.value.getOrElse(v1, Iterable())
} yield (s"$k-$v2", 1))
joined.foreach(println)
val result = joined.reduceByKey((a,b) => a+b)
我嘗試通過從腳本看到廣播變量來管理這個問題。如果我加入RDD2(擁有250K行)與自身對在同一個分區中顯示,那麼發生較少的洗牌,因此需要3分鐘才能獲得結果。但是,在應用RDD1與RDD2時,對會通過分區分散,導致非常昂貴的洗牌過程,並且總是給出
錯誤TaskSchedulerImpl:localhost上丟失的executor驅動程序:執行程序心跳在168591 ms錯誤後超時。
根據我的結果:
我應該嘗試分區文本文件中較小的塊 創建RDD1集,並分別加入那些更小的塊與RDD2?
是否有另一種方法基於它們的值字段連接兩個RDD?如果我將原始值描述爲鍵並將其與連接函數結合起來,則值對再次分散在分區上,這又導致非常昂貴的減鍵操作。例如
val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))} val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}
RDD1.join(RDD2).MAP(線=>(line._2,1))。reduceByKey((A,B)=>(A + B))
僞數據樣本:
KEY VALUE
1 13894
1 17376
1 15688
1 22434
1 2282
1 14970
1 11549
1 26027
1 2895
1 15052
1 20815
2 9782
2 3393
2 11783
2 22737
2 12102
2 10947
2 24343
2 28620
2 2486
2 249
2 3271
2 30963
2 30532
2 2895
2 13894
2 874
2 2021
3 6720
3 3402
3 25894
3 1290
3 21395
3 21137
3 18739
...
一個小例子
RDD1集
2 1
2 2
2 3
2 4
2 5
2 6
3 1
3 6
3 7
3 8
3 9
4 3
4 4
4 5
4 6
RDD2
21 1
21 2
21 5
21 11
21 12
21 10
22 7
22 8
22 13
22 9
22 11
基於此數據JOIN結果:
(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(3-22,1)
(4-21,1)
(2-21,1)
(3-21,1)
(3-22,1)
(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(4-21,1)
(2-21,1)
(3-21,1)
REDUCEBYKEY結果:
(4-21,1)
(3-21,1)
(2-21,3)
(3-22,3)
你應該展示一個使用nano-data的例子,告訴我spark-default.conf的內容是什麼 –
我剛編輯過這個問題。 –
我在哪裏工作我們有4臺8GB的計算機,並且我們能夠讀取4GB +文件而沒有任何問題,所以我打賭你的$ SPARK_HOME/conf/spark-defaults.conf中存在問題,再加上你沒有任何問題, t添加我用IN納米數據請求的示例 - > OUT納米數據(使其更清晰) –