有時Spark會以低效的方式「優化」數據幀。考慮星火2.1下面的例子中(也可以在星火1.6轉載):如何防止Spark優化
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
在這個例子中,我希望有一個數據幀的昂貴的改造後寫1個文件(這只是證明了問題的示例)。 Spark將coalesce(1)
向上移動,使得UDF僅應用於包含1分區的數據幀,從而破壞並行性(有趣的是,repartition(1)
不以此方式運行)。
概括地說,當我想在我的轉換的某個部分中增加並行性,但之後會減少並行性時,會發生此行爲。
我找到了一個解決方法,其包括高速緩存數據框,然後觸發數據幀的完整的評價:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
我的問題是:有另一種方式告訴星火不改變的位置一定的轉變?
簡而言之,您想*實例化一個RDD與500個分區,然後*實例化*另一個將結果合併到1個分區中,以便您可以將其保存到單個文件中 - cf. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code >>瘋狂猜測:也許一個簡單的調用'getNumPartitions()'將足以強制實例化,沒有必須用'count()'來實際掃描結果... –
@SamsonScharfrichter不調用'getNumPartitions()'是不夠的,並且不會阻止合併被「推高」 –
巧合:我偶然發現了那個演示,來自最近的Spark Summit> https://www.slideshare.net/databricks/why-you-should-care-about-data-layout-in-the-file-system-with-cheng-lian-and-vida-公頃/ 40 –