2017-05-05 223 views
3

我想使用spark將大型(51GB)XML文件(在外部硬盤上)讀入數據幀(使用spark-xml plugin),執行簡單映射/過濾,重新排序它,然後將其作爲CSV文件寫回磁盤。在Spark中讀取大文件時發生內存不足錯誤2.1.0

但我總是得到一個java.lang.OutOfMemoryError: Java heap space無論我如何調整這一點。

我想知道爲什麼不增加分區的數量停止OOM錯誤

難道不應該分裂的任務分解成多個部分,使每個人的部分較小,不會造成內存問題?

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

事情我已經嘗試:

  • 重新分區/讀取和寫入時,當合併到(5000和10000個分區)數據幀(初始值是1604)
  • 使用數量較少的遺囑執行人(6,4,甚至執行人我得到OOM錯誤!)
  • 減少分割文件的大小(默認看起來像它的33MB)
  • 給予噸RAM(我的全部)
  • 增加spark.memory.fraction〜0.8(默認爲0.6)
  • 降低spark.memory.storageFraction〜0.2(默認爲0.5)
  • 設置spark.default.parallelism到30和40(默認爲8爲我)
  • 設置spark.files.maxPartitionBytes爲64M(默認爲128M)

我所有的代碼是在這裏(請注意,我沒有任何緩存):

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) // defined previously 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
// prints 1604 

// i pass `numPartitions` as cli arguments 
val df2 = df.coalesce(numPartitions) 

// filter and select only the cols i'm interested in 
val dsout = df2 
    .where(df2.col("_TypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
).as[Post] 

// regexes to clean the text 
val tagPat = "<[^>]+>".r 
val angularBracketsPat = "><|>|<" 
val whitespacePat = """\s+""".r 


// more mapping 
dsout 
.map{ 
    case Post(id,title,body,tags) => 

    val body1 = tagPat.replaceAllIn(body,"") 
    val body2 = whitespacePat.replaceAllIn(body1," ") 

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

} 
.orderBy(rand(SEED)) // random sort 
.write // write it back to disk 
.option("quoteAll", true) 
.mode(SaveMode.Overwrite) 
.csv(output) 

注意

  • 輸入分流真的很小(僅33MB),所以爲什麼我不能擁有8個線程處理每一個分裂?它真的不應該吹我的記憶(我瑟

UPDATE我寫的只是讀取該文件,然後forEachPartition(的println)代碼的短版本

我得到相同的OOM錯誤:

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 
    .repartition(numPartitions) 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 

df 
    .where(df.col("_PostTypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
    df("_Tags").as("tags") 
).as[Post] 
    .map { 
    case Post(id, title, body, tags) => 
     Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase)) 
    } 
    .foreachPartition { rdd => 
    if (rdd.nonEmpty) { 
     println(s"HI! I'm an RDD and I have ${rdd.size} elements!") 
    } 
    } 

PS:我正在使用spark v 2.1.0。我的機器有8個內核和16 GB內存。

+0

您是否檢查過Spark UI中創建的分區的大小? – Khozzy

+0

@Khozzy這是我用1604個分區讀取DF和50個分區寫入DF時的應用程序:[screenshot-spark-ui](http://i.imgur.com/a5LjEmc。 png) –

+0

是的,但在作業執行過程中查看UI。你會發現每個任務執行的時間以及你的CPU的使用情況(可能有零星)。 – Khozzy

回答

0

因爲你是存儲您的RDD兩次 你的邏輯必須是這樣的變化或SparkSql

過濾
val df: DataFrame = SparkFactory.spark.read 
     .option("mode", "DROPMALFORMED") 
     .format("com.databricks.spark.xml") 
     .schema(customSchema) // defined previously 
     .option("rowTag", "row") 
     .load(s"$pathToInputXML") 
     .coalesce(numPartitions) 

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
    // prints 1604 


    // regexes to clean the text 
    val tagPat = "<[^>]+>".r 
    val angularBracketsPat = "><|>|<" 
    val whitespacePat = """\s+""".r 

    // filter and select only the cols i'm interested in 
    df 
     .where(df.col("_TypeId") === "1") 
     .select(
     df("_Id").as("id"), 
     df("_Title").as("title"), 
     df("_Body").as("body"), 
    ).as[Post] 
     .map{ 
     case Post(id,title,body,tags) => 

      val body1 = tagPat.replaceAllIn(body,"") 
      val body2 = whitespacePat.replaceAllIn(body1," ") 

      Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

     } 
     .orderBy(rand(SEED)) // random sort 
     .write // write it back to disk 
     .option("quoteAll", true) 
     .mode(SaveMode.Overwrite) 
     .csv(output) 
+0

讓所有的DF都沒有幫助..我仍然有'java.lang.OutOfMemoryError:Java heap space' –

-2

您可以通過添加更改堆大小在您的環境變量如下:

  1. 環境變量名稱:_JAVA_OPTIONS
  2. 環境變量值:-Xmx512M -Xms512m