我有一個火花任務,它接收來自hdfs的8條記錄的文件,做一個簡單的聚合並將其保存回Hadoop。我注意到當我這樣做時,有幾百個任務。爲什麼我的火花任務有這麼多任務?
我也不確定爲什麼有這樣的多個工作?我覺得工作更像是什麼時候發生的事情。我可以推測爲什麼 - 但我的理解是,在這個代碼中,它應該是一項工作,它應該分解成多個階段,而不是多個工作。爲什麼它不把它分解成幾個階段,它怎麼分解成工作?
至於200個加任務,因爲數據量和節點的量是微乎其微的,它沒有任何意義,有像25個任務每行數據時,只有一個聚合和幾個過濾器。爲什麼每個分區每個原子操作只有一個任務?
下面是相關Scala代碼 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt
))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
以下是屏幕的第一部分,當點擊超過200個任務的舞臺時
按照要求,這裏有招聘ID階段1
這裏是第E詳細爲作業ID 1階段200個任務
感謝的人!我會立即做這件事檢查出來。那麼多重工作呢?爲什麼有兩份工作? –
你有沒有作業ID 1階段的屏幕? – marios
我將它們添加到OP –