2017-09-16 63 views
3

試想一下,我們有3個客戶,我們希望做並行他們每個人的一些同樣的工作。運行過程中出現一些火花作業同時從驅動

def doSparkJob(customerId: String) = { 
    spark 
    .read.json(s"$customerId/file.json") 
    .map(...) 
    .reduceByKey(...) 
    .write 
    .partitionBy("id") 
    .parquet("output/") 
} 

我們這樣做同時像這樣(從火花司機):

val jobs: Future[(Unit, Unit, Unit)] = for { 
    f1 <- Future { doSparkJob("customer1") } 
    f2 <- Future { doSparkJob("customer1") } 
    f3 <- Future { doSparkJob("customer1") } 
} yield (f1, f2, f3) 

Await.ready(jobs, 5.hours) 

難道我理解正確的話,這是不好的做法?許多火花的工作將從執行者中推出彼此的背景,並且會出現許多向光盤泄漏的數據。如何處理來自並行作業的執行任務的火花?當一個驅動程序有3個併發作業,並且只有3個具有一個內核的執行程序時,如何出現混洗。

我想,最好的方法應該是這樣的: 我們一起讀取所有客戶的所有數據groupByKey by customer,並做我們想做的事情。

回答

2

難道我理解正確的話,這是不好的做法?

不一定。很大程度上取決於上下文,Spark實現它自己的一組AsyncRDDActions來解決像這樣的場景(儘管沒有Dataset等效)。

在最簡單的情況下,靜態分配,這是很有可能的Spark將只安排所有的工作順序,由於缺乏資源。除非另有配置,否則這是所描述的配置最可能的結果。請記住,Spark可以使用FAIR調度程序在應用程序調度中在多個併發作業之間共享有限的資源。見Scheduling Within an Application

如果資源量足以同時啓動多個作業,可以有單獨的作業之間的競爭,尤其是對IO和內存密集型工作。如果所有作業都使用相同的資源(尤其是數據庫),Spark可能會導致限制以及隨後的故障或超時。運行多個作業不太嚴重的影響可能會增加緩存驅逐。

總的來說,在順序執行和併發執行之間需要考慮多種因素,包括但不限於可用資源(Spark集羣和外部服務),API的選擇(RDD往往比SQL更貪婪,因此需要一些低級管理)和運營商的選擇。即使作業按順序進行,您仍然可能決定使用異步方式來提高驅動程序利用率並減少延遲。這對於Spark SQL和複雜的執行計劃(Spark SQL中的常見瓶頸)特別有用。通過這種方式,Spark可以減少新的執行計劃,同時執行其他作業。