2017-08-16 227 views
0

我正在嘗試按特定順序將多個RDD的字符串合併到RDD行中。我試圖創建一個Map[String, RDD[Seq[String]]](其中Seq只包含一個元素),然後將它們合併到一個RDD[Row[String]],但它似乎不起作用(內容RDD[Seq[String]]丟失)。有人有什麼想法嗎?按特定順序合併多個RDD

val t1: StructType 
val mapFields: Map[String, RDD[Seq[String]]] 
var ordRDD: RDD[Seq[String]] = context.emptyRDD 
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name)) 
val rdd = ordRDD.map(line => Row.fromSeq(line)) 

編輯: 使用壓縮功能導致火花例外,因爲我的RDDS沒有相同數量的每個分區元素。我不知道如何確保它們在每個分區中都具有相同數量的元素,因此我只是使用索引對它們進行壓縮,然後使用ListMap以良好順序加入它們。也許有一個關於mapPartitions函數的技巧,但我還不夠了解Spark API。

val mapFields: Map[String, RDD[String]] 
var ord: ListMap[String, RDD[String]] = ListMap() 
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name))) 
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition 
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq) 
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) }) 
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)}) 
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) } 
val df1 = spark.createDataFrame(rowRdd, t1) 
+0

你是什麼意思是「合併」,你的意思是每個RDD將「貢獻」一個_column_的結果?如果是這樣 - 如果不是所有的RDD都具有相同的尺寸,會發生什麼? –

+0

是的,每個「RDD」將成爲一列。 RDD應該具有相同的大小。我認爲沒有必要考慮這種情況。 – belgacea

回答

1

這裏的關鍵是使用RDD.zip爲「拉鍊」的RDDS在一起(創建RDD其中每個記錄是記錄在ELL RDDS同一指數的組合):

import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 

// INPUT: Map does not preserve order (not the defaul implementation, at least) - using Seq 
val rdds: Seq[(String, RDD[String])] = Seq(
    "field1" -> sc.parallelize(Seq("a", "b", "c")), 
    "field2" -> sc.parallelize(Seq("1", "2", "3")), 
    "field3" -> sc.parallelize(Seq("Q", "W", "E")) 
) 

// Use RDD.zip to zip all RDDs together, then convert to Rows 
val rowRdd: RDD[Row] = rdds 
    .map(_._2) 
    .map(_.map(s => Seq(s))) 
    .reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map { case (l1, l2) => l1 ++ l2 }) 
    .map(Row.fromSeq) 

// Create schema using the column names: 
val schema: StructType = StructType(rdds.map(_._1).map(name => StructField(name, StringType))) 

// Create DataFrame: 
val result: DataFrame = spark.createDataFrame(rowRdd, schema) 

result.show 
// +------+------+------+ 
// |field1|field2|field3| 
// +------+------+------+ 
// |  a|  1|  Q| 
// |  b|  2|  W| 
// |  c|  3|  E| 
// +------+------+------+ 
+0

與我之前說的不同,似乎必須考慮rdd的大小和分區,因爲spark只能在每個分區中使用相同數量的元素壓縮RDD。否則,它可能會導致火花異常。 – belgacea