2016-11-23 60 views
5

我想知道什麼是不同的使用火花mapPartitions功能與瞬態懶惰val。
由於每個分區基本上都在不同的節點上運行,因此每個節點上都會創建一個臨時lazy val實例(假設它在一個對象中)。Spark mapPartitions vs transient lazy val

例如:

class NotSerializable(v: Int) { 
    def foo(a: Int) = ??? 
} 

object OnePerPartition { 
    @transient lazy val obj: NotSerializable = new NotSerializable(10) 
} 

object Test extends App{ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("test") 
    val sc = new SparkContext(conf) 

    val rdd: RDD[Int] = sc.parallelize(1 to 100000) 

    rdd.map(OnePerPartition.obj.foo) 

    // ---------- VS ---------- 

    rdd.mapPartitions(itr => { 
     val obj = new NotSerializable(10) 
     itr.map(obj.foo) 
    }) 
} 

也許有人會問,爲什麼你甚至想它...
我想創建一個普通的容器的概念對任何泛型集合實現(RDD運行我的邏輯,Listscalding pipe等)
他們都有一個「地圖」的概念,但mapPartition是唯一的spark

回答

2

首先你不需要transientlazy這裏。使用object包裝就足以使這項工作實際上,你可以寫爲:

object OnePerExecutor { 
    val obj: NotSerializable = new NotSerializable(10) 
} 

有對象封裝和初始化NotSerializablemapPartitions之間的根本區別。這:

rdd.mapPartitions(iter => { 
    val ns = NotSerializable(1) 
    ??? 
}) 

創建每個分區的單一NotSerializable實例。

另一方面,對象包裝爲每個執行器JVM創建一個單一的NotSerializable實例。結果如下:

  • 可用於處理多個分區。
  • 可以被多個執行程序線程同時訪問。
  • 壽命超過使用它的函數調用。

這意味着它應該是線程安全的,任何方法調用應該是副作用免費。