2016-06-11 108 views
12

我有一個火花任務,它接收來自hdfs的8條記錄的文件,做一個簡單的聚合並將其保存回Hadoop。我注意到當我這樣做時,有幾百個任務。爲什麼我的火花任務有這麼多任務?

我也不確定爲什麼有這樣的多個工作?我覺得工作更像是什麼時候發生的事情。我可以推測爲什麼 - 但我的理解是,在這個代碼中,它應該是一項工作,它應該分解成多個階段,而不是多個工作。爲什麼它不把它分解成幾個階段,它怎麼分解成工作?

至於200個加任務,因爲數據量和節點的量是微乎其微的,它沒有任何意義,有像25個任務每行數據時,只有一個聚合和幾個過濾器。爲什麼每個分區每個原子操作只有一個任務?

下面是相關Scala代碼 -

import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object TestProj {object TestProj { 
    def main(args: Array[String]) { 

    /* set the application name in the SparkConf object */ 
    val appConf = new SparkConf().setAppName("Test Proj") 

    /* env settings that I don't need to set in REPL*/ 
    val sc = new SparkContext(appConf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 

    /*the below rdd will have schema defined in Record class*/ 
    val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 
         .map(x=>x.split(" ")) //file record into array of strings based spaces 
         .map(x=>Record(
            x(0).toInt, 
            x(1).asInstanceOf[String], 
            x(2).asInstanceOf[String], 
            x(3).toInt 
            )) 


    /* the below dataframe groups on first letter of first name and counts it*/ 
    val aggDF = rddCase.toDF() 
         .groupBy($"firstName".substr(1,1).alias("firstLetter")) 
         .count 
         .orderBy($"firstLetter") 

    /* save to hdfs*/ 
    aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg") 

    } 

    case class Record(id: Int 
        , firstName: String 
        , lastName: String 
        , quantity:Int) 

} 

下面是截圖點擊應用後 enter image description here

下面是查看ID 0的特定的「工作」時,階段表現 enter image description here

以下是屏幕的第一部分,當點擊超過200個任務的舞臺時

enter image description here

這是舞臺enter image description here

下面屏幕裏的第二部分是點擊「執行者」標籤後 enter image description here

按照要求,這裏有招聘ID階段1

enter image description here

這裏是第E詳細爲作業ID 1階段200個任務

enter image description here

回答

17

這是一個經典的Spark問題。

用於讀取的兩個任務(第二個圖中的階段ID 0)是defaultMinPartitions設置,設置爲2.您可以通過讀取REPL sc.defaultMinPartitions中的值來獲取此參數。它也應該可以在Spark UI的「環境」下點擊。

你可以看看github上的code,看看到底發生了什麼。如果您想在讀取時使用更多的分區,只需將其添加爲參數,例如sc.textFile("a.txt", 20)

現在有趣的部分來自於第二階段出現的200個分區(第二個圖中的階段Id 1)。那麼,每次洗牌時,Spark都需要決定洗牌RDD有多少個分區。你可以想象,默認值是200。如果你有這個配置,你會看到,200個分區不會是有什麼比較運行代碼

sqlContext.setConf("spark.sql.shuffle.partitions", "4」) 

你可以改變使用。如何設置這個參數是一種藝術。也許選擇兩倍的核心數量(或其他)。

我認爲Spark 2.0有一種方法可以自動推斷洗牌RDD的最佳分區數量。期待!

最後,您得到的工作量與產生的優化的Dataframe代碼導致的多少RDD操作有關。如果您閱讀Spark規範,則說明每個RDD操作都會觸發一項工作。當您的操作涉及Dataframe或SparkSQL時,Catalyst優化器將找出執行計劃並生成一些基於RDD的代碼來執行它。很難說出爲什麼它在你的情況下使用兩個動作。您可能需要查看優化的查詢計劃,以確切瞭解正在執行的操作。

+0

感謝的人!我會立即做這件事檢查出來。那麼多重工作呢?爲什麼有兩份工作? –

+1

你有沒有作業ID 1階段的屏幕? – marios

+0

我將它們添加到OP –

1

我有類似的問題。但在我的場景中,我並行化的集合的元素少於Spark計劃的任務數量(導致Spark有時會出現奇怪行爲)。使用強制分區號我能夠解決這個問題。

它是這樣的:

collection = range(10) # In the real scenario it was a complex collection 
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario 

然後,我在Spark日誌看到:

INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks