2016-11-05 46 views
13

Spark專家的一個很好的問題。Spark執行程序上的對象緩存

我正在處理map操作(RDD)中的數據。在映射函數中,我需要查找A類的對象以用於處理RDD中的元素。

由於這將在執行器上執行,並且創建類型爲A(將被查找)的元素恰好是一個昂貴的操作,我想要在每個執行器上預加載和緩存這些對象。做這件事的最好方法是什麼?

  • 一個想法是廣播查找表,但類A是不可序列(在其沒有實施控制)。

  • 另一個想法是將它們加載到單例對象中。但是,我想控制裝入該查找表的內容(例如,不同的Spark作業中可能有不同的數據)。

理想情況下,我需要指定哪些將在執行人一次裝入(包括流的情況下,使查找錶停留在批次之間的內存),通過將可在駕駛過程中的參數它的啓動,在任何數據被處理之前。

是否有乾淨優雅的做法,或者它不可能實現?

+0

爲什麼沒有查找表也分佈?所以你可以使用DataFrames來連接兩組數據?如果總是需要查找數據,那麼每次需要運行計算時都需要承受廣播數據的費用? – DevZer0

+1

@ DevZer0 _A不是serializable_。 – 2016-11-05 14:07:25

回答

3

這正是broadcast.的目標用例。廣播變量只發送一次,並使用種子高效地移動到所有執行程序,並保留在內存/本地磁盤中,直到不再需要它們爲止。

序列化在使用其他接口時經常會彈出一個問題。如果你可以強制你使用的對象是可序列化的,那將是最好的解決方案。如果這是不可能的,你的生活會變得更復雜一些。如果無法序列​​化A對象,則必須在執行器上爲每項任務創建它們。如果他們存儲在文件中的某個地方,這看起來是這樣的:

rdd.mapPartitions { it => 
    val lookupTable = loadLookupTable(path) 
    it.map(elem => fn(lookupTable, elem)) 
} 

請注意,如果你採用這種模式,那麼你必須每一次任務加載查找表 - 你不能受益於廣播變量的跨任務持久性。

編輯:這裏是另一個模型,我相信可以讓您在每個JVM的任務之間共享查找表。

class BroadcastableLookupTable { 
    @transient val lookupTable: LookupTable[A] = null 

    def get: LookupTable[A] = { 
    if (lookupTable == null) 
     lookupTable = < load lookup table from disk> 
    lookupTable 
    } 
} 

這個類可以廣播(沒有實質性的傳輸),並且第一次調用每個JVM時,您將加載查找表並返回它。

+0

不幸的是,這些對象是不可序列化的,所以我們確實需要採用第二種方法,就像你所描述的那樣。但是,我們還必須能夠跨任務共享查找表。 – DruckerBg

+0

爲什麼你需要分享任務?你是否正在更新地圖操作中的查找表? – Tim

+1

增加了一種可能的方式來做到這一點。 – Tim

3

如果序列化結果不可能,那麼如何將查找對象存儲在數據庫中?這不是最簡單的解決方案,但應該可以正常工作。我可以推薦檢查例如spark-redis,但我確定有更好的解決方案。

+0

謝謝,這是一個不錯的解決方案。一個問題是這些實際上是JVM中的一些對象。 – DruckerBg

+0

我更新了這個問題,包括:「...創建類型A(將被查找)的元素碰巧是一個昂貴的操作...」 – DruckerBg

+0

如何將JVM對象存儲爲字節數組,Redis? –

0

由於A不是可序列化的,因此最簡單的解決方案是創建自己的可序列化類型A1,其中包含計算所需的所有A數據。然後在廣播中使用新的查找表。