2016-09-15 61 views
0

元素火花Scala的框架的RDD的元件,我有一個RDD,rdd1,其中每個元素表示一個矩陣A的單個元件:火花:獲取基於陣列的另一個RDD

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}

x表示行, y表示列和 v表示矩陣A值。

我也有另一個RDD,rdd2,在RDD[index, Array[(x, y)]]的形式,其中在每個元素的數組代表該集合中的矩陣A,它們被存儲在rdd1,需要在該元素表示的特定index的元件。

現在我需要做的,就是讓矩陣A元素的值對每個index,保留所有的數據,包括index(x,y)v。這樣做會是一個好方法嗎?

回答

1

如果我理解正確的話,你的問題可以歸結爲:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v) 
    ((0, 0), 5.5),    
    ((1, 0), 7.7) 
)) 

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)]) 
    (123, Array((0, 0), (1, 0))) 
)) 

而且要合併這些RDDS來獲取所有值(index, (x, y), v),在這種情況下,(123, (0,0), 5.5)(123, (1,0), 7.7)

你絕對可以做到這一點使用join,因爲兩者RDDS有一個共同的列(x, y),但由於其中一人居然有一個Array[(x, y)]你必須爆炸是爲一組行的第一:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}} 
// Each row exploded into multiple rows (index, (x, y)) 

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

// Because we have common keys, we can join! 
val joined = keyedIndices.join(keyedValues) 
+0

謝謝。它在flatMap語句中使用'_'發出一個錯誤:'擴展函數類型的缺少參數...' – EdgeRover

+0

好的。它有一點修改:val explodedIndices = qual.flatMap {case(index,coords:Array [(Long,Long)])=> coords.map {case(x,y)=>(index,(x, Y))}}'。謝謝。 – EdgeRover

+0

太棒了!修正了答案,我實際上並沒有試圖運行它。 – spiffman