我試圖使用Spark 1.6.1
將parquet
文件寫入Amazon S3
。我生成的小parquet
是一次寫入~2GB
,所以它不是那麼多的數據。我試圖證明Spark
作爲我可以使用的平臺。使用Spark在s3a上向s3寫入實木複合地址文件非常慢
基本上我正在設置一個star schema
與dataframes
,然後我將這些表寫出來實木複合地板。數據來自供應商提供的csv文件,我使用Spark作爲ETL
平臺。我目前在ec2(r3.2xlarge)
有3個節點的集羣,所以120GB
的執行內存和總共16個內核。
輸入文件總計大約22GB,現在我正在提取大約2GB的數據。最終,當我開始加載完整數據集時,這將會有很多TB。
這裏是我的火花/斯卡拉pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6)))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
計大約需要2分鐘465884512行。到地板的寫操作38分鐘
我明白3210做了洗牌;但確實寫的驅動程序....但時間它的服用量是讓我覺得我做的事情嚴重錯誤。如果沒有3210,這仍然需要15分鐘,該IMO仍然太長,並給我一噸小的parquet
文件。我想每天有一個大文件,我將擁有。我的代碼也是通過字段值進行分區,並且它也一樣慢。我也試圖輸出這csv
,這需要約1小時。
另外,我在提交工作時並沒有真正設置跑步時間道具。我的一次作業控制檯統計信息是:
- 活着工人:2個
- 芯在使用中:16總,16用於
- 使用的存儲:117.5 GB總,107.5 GB二手
- 應用範圍:1跑步,5完成
- 驅動程序:0運行,0已完成
- 狀態:ALIVE
合併不會拖曳到驅動程序之間,在執行程序之間洗牌,但這與您所看到的問題無關。你在使用EMR嗎?如果是這樣,請使用s3://而不是s3a://。在Spark 1.6上,您應該使用Direct OutputCommitter,就像@David所說的那樣。另一個可能的改進是將parquet.enable.summary-metadata設置爲false –
在S3前面使用Alluxio能夠加速它嗎? –