2016-11-28 42 views
1

根據Spark源代碼註釋。SparkContext並行化懶惰行爲 - 不明原因

SparkContext.scala有

/** Distribute a local Scala collection to form an RDD. 
    * 
    * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call 
    * to parallelize and before the first action on the RDD, the resultant RDD will reflect the 
    * modified collection. Pass a copy of the argument to avoid this. 
    * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an 
    * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. 
    */ 

所以,我想我會做一個簡單的測試。

scala> var c = List("a0", "b0", "c0", "d0", "e0", "f0", "g0") 
c: List[String] = List(a0, b0, c0, d0, e0, f0, g0) 

scala> var crdd = sc.parallelize(c) 
crdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26 

scala> c = List("x1", "y1") 
c: List[String] = List(x1, y1) 

scala> crdd.foreach(println) 
[Stage 0:>               (0 + 0)/8]d0 
a0 
b0 
e0 
f0 
g0 
c0 

scala> 

我期待crdd.foreach(println)輸出 「x1」 和 「y1」 的基礎上,parallelize懶惰行爲。

我在做什麼錯?

回答

2

根本沒有修改c。您將其重新分配給新列表。

除此之外點,

如果seq是可變集合

Scala的List是不是一個可變集合

和調用之後改變了並行和前在RDD上的第一個動作

嗯,看,你沒有真正改變名單。


下面是記錄的行爲的適當示例。

scala> val c = scala.collection.mutable.ListBuffer(1, 2, 3) 
c: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3) 

scala> val cRDD = sc.parallelize(c) 
cRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29 

scala> c.append(4) 

scala> c 
res7: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3, 4) 

scala> cRDD.collect() 
res8: Array[Int] = Array(1, 2, 3, 4) 
+0

這就解釋了。謝謝。 –