2016-11-03 128 views
3

我使用星火1.6.1和寫入HDFS寫入HDFS。在某些情況下,似乎所有的工作都由一個線程完成。這是爲什麼?慢實木複合地板使用的Spark

另外,我需要parquet.enable.summary元數據的拼花文件註冊到黑斑羚。

Df.write().partitionBy("COLUMN").parquet(outputFileLocation); 

它也似乎所有這一切發生在一個執行者的CPU。

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0 
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far) 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

話又說回來: -

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting. 
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far) 
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far) 
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time. 
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

的架構

連連以下行20倍左右的約200。

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040 
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed 
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver 

更新: parquet.enable.summary元數據設置爲false - :如下行

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp} 

然後在最後的

16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk. 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651 

約200。
分區減少到21

Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation); 

它確實提高了速度,但仍需要一個小時才能完成。

更新: - 原因大多數問題是隻寫之前非常小的數據被物化多個左外連接。溢出發生是因爲Append模式使文件保持打開狀態。這是在此模式下5個打開文件的默認限制。您可以增加此使用屬性「spark.sql.sources.maxConcurrentWrites」

+0

你嘗試了文件的讀取過程中使用重新分區在你的代碼? –

+0

默認情況下,Spark爲每個工作人員分配一個執行程序,您可以指定您想要的執行程序的數量。你使用哪個主人(本地紗線)? – ahars

+0

spark-submit''--master''yarn-cluster''--driver-memory''8G''--driver-cores''3''--executor-memory''8G''--driver-核心「3」 - 執行程序核心「3」 - 執行程序的'4' – morfious902002

回答

0

在代碼中的一些優化後,最終到達我們得到了更好的寫次數寫入部分之前。在我們無法進行重新分配之前,由於洗牌數超過4-5 Gb。經過以前的更改,我將代碼從合併更改爲重新分區,通過向執行者中的每個CPU提供相同數量的數據寫入來將數據分發到所有執行程序中。 因此,如果您看到作業創建的地板文件大小不同,那麼在寫入之前嘗試重新分區Dataframe。

此外,這可以寫入性能也有所幫助: -

sc.hadoopConfiguration.set("parquet.enable.dictionary", "false") 
相關問題