2017-06-12 61 views
2

有時Spark會以低效的方式「優化」數據幀。考慮星火2.1下面的例子中(也可以在星火1.6轉載):如何防止Spark優化

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value") 

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d}) 

val df_result = df 
.withColumn("udfResult",expensiveUDF($"value")) 

df_result 
.coalesce(1) 
.saveAsTable(tablename) 

在這個例子中,我希望有一個數據幀的昂貴的改造後寫1個文件(這只是證明了問題的示例)。 Spark將coalesce(1)向上移動,使得UDF僅應用於包含1分區的數據幀,從而破壞並行性(有趣的是,repartition(1)不以此方式運行)。

概括地說,當我想在我的轉換的某個部分中增加並行性,但之後會減少並行性時,會發生此行爲。

我找到了一個解決方法,其包括高速緩存數據框,然後觸發數據幀的完整的評價:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value") 

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d}) 

val df_result = df 
.withColumn("udfResult",expensiveUDF($"value")) 
.cache 

df_result.rdd.count // trigger computation 

df_result 
.coalesce(1) 
.saveAsTable(tablename) 

我的問題是:有另一種方式告訴星火不改變的位置一定的轉變?

+0

簡而言之,您想*實例化一個RDD與500個分區,然後*實例化*另一個將結果合併到1個分區中,以便您可以將其保存到單個文件中 - cf. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code >>瘋狂猜測:也許一個簡單的調用'getNumPartitions()'將足以強制實例化,沒有必須用'count()'來實際掃描結果... –

+0

@SamsonScharfrichter不調用'getNumPartitions()'是不夠的,並且不會阻止合併被「推高」 –

+0

巧合:我偶然發現了那個演示,來自最近的Spark Summit> https://www.slideshare.net/databricks/why-you-should-care-about-data-layout-in-the-file-system-with-cheng-lian-and-vida-公頃/ 40 –

回答

3

其實這是不是因爲SparkSQL的優化,SparkSQL不會改變合併操作的位置,作爲執行計劃顯示:

Coalesce 1 
+- *Project [value#2, UDF(value#2) AS udfResult#11] 
    +- *SerializeFromObject [input[0, double, false] AS value#2] 
     +- Scan ExternalRDDScan[obj#1] 

我引述聚結API的描述一段:

注意:本段由jira SPARK-19399添加。所以它不應該在2.0的API中找到。

但是,如果您正在進行劇烈的聚結,到numPartitions = 1,這可能導致您的計算髮生在比您喜歡的更少的節點 (例如,在numPartitions = 1的情況下爲一個節點)。要避免這種情況,你可以調用重新分區。這將添加一個混洗步驟, ,但意味着當前的上游分區將並行執行 (無論當前分區是什麼)。

coalesce API不執行混洗,但會導致先前RDD和當前RDD之間的狹窄依賴關係。由於RDD是懶惰評估,計算實際上是通過合併分區完成的。

要防止它,您應該使用重新分區API。