2016-04-29 58 views
13

我試圖使用Spark 1.6.1parquet文件寫入Amazon S3。我生成的小parquet是一次寫入~2GB,所以它不是那麼多的數據。我試圖證明Spark作爲我可以使用的平臺。使用Spark在s3a上向s3寫入實木複合地址文件非常慢

基本上我正在設置一個star schemadataframes,然後我將這些表寫出來實木複合地板。數據來自供應商提供的csv文件,我使用Spark作爲ETL平臺。我目前在ec2(r3.2xlarge)有3個節點的集羣,所以120GB的執行內存和總共16個內核。

輸入文件總計大約22GB,現在我正在提取大約2GB的數據。最終,當我開始加載完整數據集時,這將會有很多TB。

這裏是我的火花/斯卡拉pseudocode

def loadStage(): Unit = { 
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData") 
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter") 
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false") 
    var sqlCtx = new SQLContext(sc) 


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz") 

    //Setup header table/df 
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1") 
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6))) 
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema) 
    header.registerTempTable("header") 
    sqlCtx.cacheTable("header") 


    //Setup fact table/df 
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2") 
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10))) 
    val df = sqlCtx.createDataFrame(records, factSchema) 
    df.registerTempTable("fact") 

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date") 


    println(results.count()) 



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet") 


    } 

計大約需要2分鐘465884512行。到地板的寫操作38分鐘

我明白​​3210做了洗牌;但確實寫的驅動程序....但時間它的服用量是讓我覺得我做的事情嚴重錯誤。如果沒有​​3210,這仍然需要15分鐘,該IMO仍然太長,並給我一噸小的parquet文件。我想每天有一個大文件,我將擁有。我的代碼也是通過字段值進行分區,並且它也一樣慢。我也試圖輸出這csv,這需要約1小時。

另外,我在提交工作時並沒有真正設置跑步時間道具。我的一次作業控制檯統計信息是:

  • 活着工人:2個
  • 芯在使用中:16總,16用於
  • 使用的存儲:117.5 GB總,107.5 GB二手
  • 應用範圍:1跑步,5完成
  • 驅動程序:0運行,0已完成
  • 狀態:ALIVE
+1

合併不會拖曳到驅動程序之間,在執行程序之間洗牌,但這與您所看到的問題無關。你在使用EMR嗎?如果是這樣,請使用s3://而不是s3a://。在Spark 1.6上,您應該使用Direct OutputCommitter,就像@David所說的那樣。另一個可能的改進是將parquet.enable.summary-metadata設置爲false –

+0

在S3前面使用Alluxio能夠加速它嗎? –

回答

14

星火defaul在I/O操作期間,ts會導致大量(可能)不必要的開銷,特別是在寫入S3時。 This article對此進行了更徹底的討論,但您需要考慮更改2個設置。

  • 使用DirectParquetOutputCommitter。默認情況下,Spark會將所有數據保存到臨時文件夾,然後移動這些文件。使用DirectParquetOutputCommitter將直接書面方式向S3輸出路徑

    • No longer available in Spark 2.0+
      • 正如JIRA客票記載節省時間,目前的解決方案是
        1. 代碼切換到使用s3a和Hadoop 2.7.2+;這是更好的全面,獲取Hadoop的2.8更好,並且是s3guard
        2. 使用Hadoop的FileOutputCommitter的基礎,並設置mapreduce.fileoutputcommitter.algorithm.version 2
  • 關閉架構合併。如果啓用了模式合併,驅動程序節點將掃描所有文件以確保一致的模式。這是非常昂貴的,因爲它不是分佈式操作。確保這是做

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

+2

截止到Spark 2.0 DirectParquetOutputCommitter不再可用。查看[SPARK-10063](https://issues.apache.org/jira/browse/SPARK-10063)獲取新解決方案 –

+0

@TalJoffe您是否嘗試過他們的解決方案?如果是這樣,它是如何工作的?你能回答如何? – David

+0

我確實嘗試過,效果很好。我在一個30g文件夾上做了一個小測試,性能幾乎相同 –

3

直接輸出提交者從火花代碼庫去關閉;你要在你自己的JAR中編寫你自己的/重新生成已刪除的代碼。如果您這樣做,請在工作中關閉猜測,並知道其他故障也可能導致問題,問題在於「無效數據」。 Hadoop 2.8將會添加一些S3A加速專門用於讀取S3以外的優化二進制格式(ORC,Parquet);另一方面,詳情請參閱HADOOP-11694。有些人正在努力將Amazon Dynamo用於一致的元數據存儲,這應該能夠在工作結束時進行可靠的O(1)提交。

相關問題