2016-11-30 143 views
10

我有一個RDD結構火花:RDD列出

RDD[(String, String)] 

和我想創建2名列表(一個用於RDD的每個維度)。我試圖使用rdd.foreach()並填充兩個ListBuffers,然後將它們轉換爲列表,但我猜每個節點都會創建自己的ListBuffer,因爲迭代後BufferLists是空的。我該怎麼做 ?

編輯:我的做法

val labeled = data_labeled.map { line => 
    val parts = line.split(',') 
    (parts(5), parts(7)) 
}.cache() 

var testList : ListBuffer[String] = new ListBuffer() 

labeled.foreach(line => 
    testList += line._1 
) 
    val labeledList = testList.toList 
    println("rdd: " + labeled.count) 
    println("bufferList: " + testList.size) 
    println("list: " + labeledList.size) 

,其結果是:

rdd: 31990654 
bufferList: 0 
list: 0 
+1

請用你的代碼更新已經嘗試過和一些輸入數據樣本和預期輸出!你的問題對我來說不是很清楚。 – eliasah

回答

9

如果你真的要創建兩個列表 - 這意味着,你希望所有的分佈式數據被收集到驅動程序應用程序(冒險緩慢或OutOfMemoryError) - 您可以使用collect,然後使用簡單的map操作結果:

val list: List[(String, String)] = rdd.collect().toList 
val col1: List[String] = list.map(_._1) 
val col2: List[String] = list.map(_._2) 

或者 - 如果你想「分」你的RDD爲兩個RDDS - 它沒有數據收集相當類似:

rdd.cache() // to make sure calculation of rdd is not repeated twice 
val rdd1: RDD[String] = rdd.map(_._1) 
val rdd2: RDD[String] = rdd.map(_._2) 

第三種方法是首先映射到這兩個RDDS和然後收集其中的每一個,但與第一個選項沒有多大區別,並且遭受相同的風險和限制。

+0

@Yuriy這裏有關廣播變量(這是隻讀的)嗎?你能更詳細地描述它嗎? – avr

+0

@avr ListBuffer是可變的,'+ ='突變內部狀態,不會創建新的引用。但是你的問題是好的,對於不可變的語句(其中引用改變任何操作)需要用一些東西(Serializable)來包裝它。 List的簡單示例:'val testList = sc.broadcast(new Serializable {var list = List.empty [String]})',並且在mutate內部狀態之後。 – Yuriy

+0

@Yuriy我認爲avr是正確的,你誤解了他/她的問題 - 這不是一個可變的與不可變的收集問題 - 廣播變量是隻讀的 - 從某種意義上說,如果它們的值在執行程序上發生變化,不會看到這種改變(Spark將如何彙總所有執行者所做的更改?)。這在本地模式下工作的事實看起來大多像一個錯誤,它不會在集羣實際分佈的地方工作。 –

1

至於Tzach瑣的回答另外,您也可以使用unzip的列表:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d"))) 
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val (l1, l2) = myRDD.collect.toList.unzip 
l1: List[String] = List(a, c) 
l2: List[String] = List(b, d) 
RDD小號

或者keysvalues

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values) 
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33 
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33 

scala> rdd1.foreach{println} 
a 
c 

scala> rdd2.foreach{println} 
d 
b