2016-11-22 212 views
14

我很困惑,爲什麼在將生成的RDD轉換爲DataFrame時,Spark將使用1個任務來處理rdd.mapPartitionspyspark在將rdd轉換爲數據幀時使用mapPartitions的一個任務

這是我的問題,因爲我想從去:

DataFrame - >RDD - >rdd.mapPartitions - >DataFrame

,這樣我可以在數據讀取(數據幀)將非SQL函數應用於數據塊(RDD上的mapPartitions),然後將其轉換回DataFrame,以便我可以使用DataFrame.write進程。

我能夠從DataFrame - > mapPartitions,然後使用像saveAsTextFile這樣的RDD編寫器,但這並不理想,因爲DataFrame.write進程可以執行諸如以Orc格式覆蓋和保存數據的操作。所以我想學習爲什麼這是在進行,但從實踐的角度來看,我主要關注的是能夠從DataFrame - > mapParitions - >使用DataFrame.write進程。

這是一個可重現的例子。正如所料,與100個任務爲mapPartitions工作以下工作:

from pyspark.sql import SparkSession 
import pandas as pd 

spark = SparkSession \ 
    .builder \ 
    .master("yarn-client") \ 
    .enableHiveSupport() \ 
    .getOrCreate() 

sc = spark.sparkContext 

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]}) 
spark_df = spark.createDataFrame(df).repartition(100) 

def f(part): 
    return [(1,2)] 

spark_df.rdd.mapPartitions(f).collect() 

但是,如果最後一行更改爲類似spark_df.rdd.mapPartitions(f).toDF().show()那麼只會對mapPartitions工作一個任務。

一些截圖示出了該下面: enter image description here enter image description here

回答

5

DataFrame.show()僅示出了數據幀的行的第一數量,默認情況下只有第一20.如果該數目大於每個分區的行數小, Spark是懶惰的,只評估一個分區,相當於一個單獨的任務。

您也可以在數據框上執行collect,以計算並收集所有分區並再次查看100個任務。

您仍然會像以前一樣先看到runJob任務,這是由toDF調用引起的,可以確定結果數據框架構:它需要處理單個分區以確定映射的輸出類型功能。在此初始階段之後,所有部分都會發生collect等實際操作。例如,對於我跑過來,在這些階段與spark_df.rdd.mapPartitions(f).toDF().collect()結果替換最後一行的代碼片段:

enter image description here

+0

調用'DataFrame.write'的結果,以及當同樣的情況。 – David

+0

你是否在等待你的工作完成?當我執行'toDF()。collect()'時,我看到一個runJob階段也有一個任務,由'toDF'啓動以檢查結果數據框架的模式,然後是一個「collect」階段, 100個任務。 – sgvd

+1

考慮到最終的結果是幾百GB的數據,collect()對我來說在現實生活中並不是可行的。只有1個任務運行'DataFrame.write'時作業失敗,但運行'saveAsText'時成功。我將編輯collect&show中的示例以保存數據,因爲這些示例之間可能存在差異。 – David