2016-12-28 67 views
1

我正在編寫一個PySpark作業,但我遇到了一些性能問題。 基本上,它所做的只是從卡夫卡讀取事件並記錄所做的轉換。 問題是,轉換是基於對象的函數進行計算的,並且該對象相當隆起,因爲它包含一個Graph和一個自動更新的內部緩存。 所以,當我寫了下面的一段代碼:PySpark流媒體作業 - 避免對象序列化

analyzer = ShortTextAnalyzer(root_dir) 
logger.info("Start analyzing the documents from kafka") 
ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))) 

它系列化我analyzer這需要,因爲圖的大量時間,並且當它被複制到執行,緩存是僅適用於特定的相關RDD。

如果這個工作是用Scala編寫的,我可以編寫一個對象,它存在於每個執行器中,然後我的對象不必每次都被序列化。

有沒有辦法在Python中做到這一點?爲每個執行器創建一次對象,然後避免序列化過程?

感謝提前:)

UPDATE: 我讀過後How to run a function on all Spark workers before processing data in PySpark?但答案沒有談論共享文件或廣播的變量。 我的對象不能被廣播,因爲他不是隻讀的。它不斷更新它的內部緩存,這就是爲什麼我需要每個執行程序都有一個對象(以避免序列化)。

回答

0

我最終做了什麼,避免了我的對象被序列化,把我的類變成一個靜態類 - 只有類變量和類方法。這樣每個執行器都會導入一次這個類(使用它的相關變量),不需要序列化。