2017-08-25 35 views
2

我有一個相當複雜的Apache PySpark管道,它對(很大的)一組文本文件執行幾個轉換。我的管道的預期產量是管道的不同階段。這是最好的方式(即更有效率,但更多波光粼粼的,意義在於:更適合Spark編程模型和風格)來做到這一點?保存我的Apache Spark管道的中間狀態

現在,我的代碼如下所示:

# initialize the pipeline and perform the first set of transformations. 
ctx = pyspark.SparkContext('local', 'MyPipeline') 
rdd = ctx.textFile(...).map(...).map(...) 

# first checkpoint: the `first_serialization` function serializes 
# the data into properly formatted string. 
rdd..map(first_serialization).saveAsTextFile("ckpt1") 

# here, I have to read again from the previously saved checkpoint 
# using a `first_deserialization` function that deserializes what has 
# been serialized from the `firs_serialization` function. Then performs 
# other transformations. 
rdd = ctx.textFile("ckpt1").map(...).map(...) 

等。我想擺脫序列化方法和多次保存/讀取 - 順便說一下,它會影響效率嗎?我承認是的。

任何提示? 在此先感謝。

回答

1

這似乎很簡單,因爲它是,但我會建議編寫中間階段,同時繼續重用現有的RDD(側欄:使用數據集/數據框而不是RDD獲得更多性能)並繼續處理,編寫隨時隨地取得中間結果。

當您已經處理了數據(理想情況下甚至是緩存!)以供進一步使用時,無需支付從磁盤/網絡讀取的處罰。

使用自己的代碼示例:

# initialize the pipeline and perform the first set of transformations. 
ctx = pyspark.SparkContext('local', 'MyPipeline') 
rdd = ctx.textFile(...).map(...).map(...) 

# first checkpoint: the `first_serialization` function serializes 
# the data into properly formatted string. 
string_rdd = rdd..map(first_serialization) 
string_rdd.saveAsTextFile("ckpt1") 

# reuse the existing RDD after writing out the intermediate results 
rdd = rdd.map(...).map(...) # rdd here is the same variable we used to create the string_rdd results above. alternatively, you may want to use the string_rdd variable here instead of the original rdd variable. 
+0

請你提高你的答案添加例如一些示例和/或一些參考代碼的鏈接?謝謝。 – petrux

+0

@petrux,我使用你自己的代碼提供了一個例子。我強烈建議評估如何使用Spark 2.x(2.2是本文寫作的最新版本)數據結構,如Dataset和DataFrame(在python中,只有pyspark sql DataFrame,Dataset不像Scala中那樣)。 – Garren

+0

@加倫:非常感謝。所以我只需要保存爲文本文件。好。關於火花版本,我使用2.2。但我不知道使用DataFrame是否適合我的任務。無論如何,我會看看,謝謝你的建議。 – petrux