我很困惑,爲什麼在將生成的RDD轉換爲DataFrame時,Spark將使用1個任務來處理rdd.mapPartitions
。pyspark在將rdd轉換爲數據幀時使用mapPartitions的一個任務
這是我的問題,因爲我想從去:
DataFrame
- >RDD
- >rdd.mapPartitions
- >DataFrame
,這樣我可以在數據讀取(數據幀)將非SQL函數應用於數據塊(RDD上的mapPartitions),然後將其轉換回DataFrame,以便我可以使用DataFrame.write
進程。
我能夠從DataFrame - > mapPartitions,然後使用像saveAsTextFile這樣的RDD編寫器,但這並不理想,因爲DataFrame.write
進程可以執行諸如以Orc格式覆蓋和保存數據的操作。所以我想學習爲什麼這是在進行,但從實踐的角度來看,我主要關注的是能夠從DataFrame - > mapParitions - >使用DataFrame.write進程。
這是一個可重現的例子。正如所料,與100個任務爲mapPartitions
工作以下工作:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
但是,如果最後一行更改爲類似spark_df.rdd.mapPartitions(f).toDF().show()
那麼只會對mapPartitions
工作一個任務。
調用'DataFrame.write'的結果,以及當同樣的情況。 – David
你是否在等待你的工作完成?當我執行'toDF()。collect()'時,我看到一個runJob階段也有一個任務,由'toDF'啓動以檢查結果數據框架的模式,然後是一個「collect」階段, 100個任務。 – sgvd
考慮到最終的結果是幾百GB的數據,collect()對我來說在現實生活中並不是可行的。只有1個任務運行'DataFrame.write'時作業失敗,但運行'saveAsText'時成功。我將編輯collect&show中的示例以保存數據,因爲這些示例之間可能存在差異。 – David