0

在運行Apache Spark作業時遇到的問題之一是將RDD中的每個元素相互相乘。 簡單地說,我希望做一些類似的,將Spark RDD中的元素互相相加

enter image description here

目前,我這樣做是使用2次迭代的每個「的foreach」。我的直覺是,這可以以高效的方式完成。

for (elementOutSide <- iteratorA) { 
    for (elementInside <- iteratorB) { 
    if (!elementOutSide.get(3).equals(elementInside.get(3))) { 
     val multemp = elementInside.getLong(3) * elementOutSide.getLong(3) 
     .... 
     ... 

}}} 

誰能幫我糾正和改善這種情況?提前致謝 .. !!

+0

我認爲你正在尋找一個普通的笛卡爾連接。 – Alec

+1

順便說一句,你的實現並不真正符合要求 - 它比較了實際的_elements_而不是它們的_indices_--,當且僅當原始RDD的記錄是_unique_時,它才起作用。 –

+0

它們是唯一的,RDD是使用保證的sql查詢構建的。 – Infamous

回答

1

正如評論指出的那樣,這是一個笛卡爾連接。下面是它可以在一個RDD[(Int, String)],在這裏我們感興趣的是每兩個不相同的Int S的乘法來完成:

val rdd: RDD[(Int, String)] = sc.parallelize(Seq(
    (1, "aa"), 
    (2, "ab"), 
    (3, "ac") 
)) 

// use "cartesian", then "collect" to map only relevant results 
val result: RDD[Int] = rdd.cartesian(rdd).collect { 
    case ((t1: Int, _), (t2: Int, _)) if t1 != t2 => t1 * t2 
} 

注:此實現假定輸入記錄是獨一無二的,因爲指示。如果它們不是,則可以在比較指數而不是數值的同時執行笛卡爾連接和rdd.zipWithIndex結果的映射。