2016-11-08 70 views
1

我有一個很大的(大約85GB壓縮的)s3文件,我嘗試在AWS EMR上用Spark處理(現在有一個m4.xlarge主實例和兩個m4)。 10倍擴展核心實例,每個實例具有100 GB EBS卷)。我知道gzip是一種不可拆分的文件格式,並且I'veseenitsuggested應該重新分區壓縮文件,因爲Spark最初會給RDD提供一個分區。但是,這樣做在Spark中處理一個大的gzip文件

scala> val raw = spark.read.format("com.databricks.spark.csv"). 
    | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")). 
    | load("s3://path/to/file.gz"). 
    | repartition(sc.defaultParallelism * 3) 
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields 
scala> raw.count() 

,並採取一看星火應用程序UI後,我仍然看到只有一個活動的執行者(其他14個都是死的)一個任務,任務無法完成(或至少我」沒有足夠的時間等待它)。

  • 這是怎麼回事?有人能幫我理解Spark在這個例子中的工作原理嗎?
  • 我應該使用不同的羣集配置嗎?
  • 不幸的是,我無法控制壓縮模式,但是有沒有另一種處理這種文件的方法?

回答

3

如果文件格式不可拆分,那麼就無法避免在一個核心上完整地讀取文件。爲了平行工作,您必須知道如何將大塊工作分配給不同的計算機。在gzip的情況下,假設你把它分成128M塊。第n塊取決於第n-1塊的位置信息,以知道如何解壓縮,這取決於第n-2塊,依此類推直到第一塊。

如果要並行化,則需要將此文件拆分。一種方法是將其解壓縮並對其進行解壓縮,或者將其解壓縮,將其分割成多個文件(每個並行任務需要一個文件),然後對每個文件進行gzip壓縮。

+1

我的印象是Spark在重新分區之前首先解壓文件。這不是這種情況嗎?那麼,我提到的四條鏈接是什麼? – user4601931

+0

是的,Spark在首先整個解壓文件(80G在一個核心上)之前,它可以混洗它來增加並行性。 – Tim

+0

好的,謝謝。你認爲我的集羣甚至能夠處理這個任務嗎?如果是這樣,如果我想解壓整個文件,重新分區,然後做進一步處理,你認爲設置'spark.dynamicAllocation.enabled = true'將確保我得到一個執行器(儘可能多的內存)到在執行處理之後,執行解壓縮,然後執行更多的執行器(內存更少但內核更多)? – user4601931