2017-09-22 73 views
0

我想加入2 PairRDD在火花和不知道如何迭代結果。斯卡拉 - 火花 - 迭代結合對RDD

val input1 = sc.textFile(inputFile1) 
val input2 = sc.textFile(inputFile2) 

val pairs = input1.map(x => (x.split("\\|")(18),x)) 
val groupPairs = pairs.groupByKey() 

val staPairs = input2.map(y => (y.split("\\|")(0),y)) 
val stagroupPairs = staPairs.groupByKey() 

val finalJoined = groupPairs.leftOuterJoin(stagroupPairs) 

finalJoined是類型finalJoined的:

org.apache.spark.rdd.RDD[(String, (Iterable[String], Option[Iterable[String]]))] 

當我做finalJoined.collect().foreach(println)我看到下面的輸出:

(key1,(CompactBuffer(val1a,val1b),Some(CompactBuffer(val1))) 
(key2,(CompactBuffer(val2a,val2b),Some(CompactBuffer(val2))) 

我想輸出是

key1

val1a+"|"+val1 

val1b+"|"+val1 

KEY2

val2a+"|"+val2 

回答

0

兩者上RDDS避免groupByKey步驟和執行加入上直接對和starpairs..you將獲得期望的結果。

對於e.g,

val rdd1 = sc.parallelize(Array("key1,val1a","key1,val1b","key2,val2a","key2,val2b").toSeq) 
val rdd2 = sc.parallelize(Array("key1,val1","key2,val2").toSeq) 
val pairs= rdd1.map(_.split(",")).map(x => (x(0),x(1))) 
val starPairs= rdd2.map(_.split(",")).map(x => (x(0),x(1))) 
val res = pairs.join(starPairs) 
res.foreach(println) 

(key1,(val1a,val1)) 
(key1,(val1b,val1)) 
(key2,(val2a,val2)) 
(key2,(val2b,val2)) 
+0

感謝。我刪除了groupBy,並使用額外的格式獲得了預期的結果。 – Nats