2015-10-19 76 views
1

我有一個這樣的數據在一個RDD:拼合在火花階的元素

RDD[((Int, Int, Int), ((Int, Int), Int))] 

爲:

(((9,679,16),((2,274),1)), ((250,976,13),((2,218),1))) 

欲輸出爲:

((9,679,16,2,274,1),(250,976,13,2,218,1)) 

加入2個RDDS後附:

val joinSale = salesTwo.join(saleFinal) 

我得到了結果集。我試了下面的代碼。

joinSale.flatMap(x => x).take(100).foreach(println) 

我試過map/flatMap但是做不到。任何想法如何實現這樣的情景?在此先感謝..

+0

元組沒有扁平方法。但是,你可以使用'productIterator'來扁化它 - http://stackoverflow.com/questions/5289408/iterate-over-a-tuple然而,這真的很醜。 – tuxdna

回答

5

你可以用scala中的模式匹配來做到這一點。只是類似於下面的地圖內包裝你的元組修改的邏輯:

val mappedJoinSale = joinSale.map { case ((a, b, c), ((d, e), f)) => (a, b, c, d, e, f) } 

使用你的榜樣,我們有:

scala> val example = sc.parallelize(Array(((9,679,16),((2,274),1)), ((250,976,13),((2,218),1)))) 
example: org.apache.spark.rdd.RDD[((Int, Int, Int), ((Int, Int), Int))] = ParallelCollectionRDD[0] at parallelize at <console>:12 

scala> val mapped = example.map { case ((a, b, c), ((d, e), f)) => (a, b, c, d, e, f) } 
mapped: org.apache.spark.rdd.RDD[(Int, Int, Int, Int, Int, Int)] = MappedRDD[1] at map at <console>:14 

scala> mapped.take(2).foreach(println) 
... 
(9,679,16,2,274,1) 
(250,976,13,2,218,1) 
+0

woow,現在我覺得很蠢..我確實實施了地圖案例,一會兒回來..好吧,我可能已經忘記..反正謝謝.. – Achillies57

+0

@ Achillies57沒有後顧之憂,我們都忘記了事情:) –

3

您還可以創建使用奇妙shapeless庫通用的元組拼合如下:

import shapeless._ 
import shapeless.ops.tuple 
trait LowLevelFlatten extends Poly1 { 
    implicit def anyFlat[T] = at[T](x => Tuple1(x)) 
} 

object concat extends Poly2 { 
    implicit def atTuples[T1, T2](implicit prepend: tuple.Prepend[T1, T2]): Case.Aux[T1, T2, prepend.Out] = 
    at[T1,T2]((t1,t2) => prepend(t1,t2)) 
} 

object flatten extends LowLevelFlatten { 
    implicit def tupleFlat[T, M](implicit 
           mapper: tuple.Mapper.Aux[T, flatten.type, M], 
           reducer: tuple.LeftReducer[M, concat.type] 
           ): Case.Aux[T, reducer.Out] = 
    at[T](t => reducer(mapper(t))) 
} 
import shapeless._存在,你可以把它作爲

任何代碼

現在

joinSale.map(flatten)