2017-09-08 101 views
0

我有一個RDD這是類似的,如何將RDD中的每一行與對方相乘?

CELL-ID | COUNT 
-------------- 
abcd  10 
DEF  20 
ghi  15 

我需要一個RDD與

CELL-ID-1 | CELL-ID-2 | PRODUCT 
-------------- 
abcd  DEF   200 
abcd  ghi   150 
DEF  abcd   200 
DEF  ghi   300 
... 
.... 

如何才能做到這一點?我綁在使用笛卡爾積,但無法得到的輸出

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case ((t1: _,Int), (t2: _,Int)) if t1 != t2 => t1 * t2 
} 
+0

如果'originalRDD'是一個很大的數據集,'.collect()'可能不適合驅動程序的內存。你沒有提到爲什麼你不能得到輸出,所以我們只能推測。 –

+0

這是相當大的RDD,6M +記錄,如果此代碼被移植到「地圖」或其他東西 – Infamous

+0

其實,我錯了......請參閱@ tzach-zohar的回覆我對他的回答的評論。 –

回答

4

您可以讓t1t2代表元組(整個「記錄」):

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case (t1: (String ,Int), t2: (String ,Int)) if t1 != t2 => (t1._1, t2._1, t1._2 * t2._2) 
} 

或者,您可以做同樣的事情,但使用模式匹配將它們進一步分解:

val result = orginalRDD.cartesian(orginalRDD).collect { 
    case ([email protected](s1 ,i1), [email protected](s2, i2)) if t1 != t2 => (s1, s2, i1 * i2) 
} 

您的解決方案就像試圖同時做兩個一樣......

+0

我建議在這裏使用'.map()'而不是'.collect()'。 '.collect()'調用將過濾後的笛卡爾結果返回到驅動程序的堆中。這個玩具數據集不是問題,但是對於大型分佈式數據集,結果會相當大。 –

+1

@TravisHegner這是不正確的 - 雖然名稱可能看起來很混亂,但RDD.collect這個特定的重載不會將數據發送回驅動程序,這是通過應用部分函數的簡單映射和過濾。實際上,它的實現和'filter(f.isDefinedAt).map(f)'一樣簡單,見https://github.com/apache/spark/blob/cd0a08361e2526519e7c131c42116bf56fa62c76/core/src/main/scala/org/apache /spark/rdd/RDD.scala#L958-L961 –

+0

我的錯誤,並感謝您的澄清。 –

相關問題