0

我正在將我的代碼從測試實驗室集羣移至EC2集羣。我使用flintrock進行設置,我運行的是「香草」Spark 2.2.0。 目前羣集有4 c3.2xlarge節點(1主,3工人)Spark 2.2.0 S3性能

我想要處理一大羣文件,每個文件本身beeing相對較大(約1 GB)。在我的代碼中,我將文件的數量分成塊。在「實驗室」中,我發現8vCPU 13G系統的性能高達每塊大約32個文件,並將結果保存到實木複合地板。在有3名工人的EC2上,我將其翻譯成96個塊,這導致了192個任務。現在我面臨着糟糕的S3性能。我收到以下錯誤:

17/09/09 03:45:33 INFO AmazonHttpClient: Unable to execute HTTP request: Read timed out 
java.net.SocketTimeoutException: Read timed out 
    at java.net.SocketInputStream.socketRead0(Native Method) 
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
    at java.net.SocketInputStream.read(SocketInputStream.java:171) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
    at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) 
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) 
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261) 
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259) 
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:209) 
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) 
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) 
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686) 
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488) 
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884) 
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) 
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384) 
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) 
    at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507) 
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143) 
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131) 
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189) 
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134) 
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

我遵循這個指南:https://hortonworks.github.io/hdp-aws/s3-performance/

我改變sparkconf設置這樣:

conf = SparkConf().setAppName(appname)\ 
.setMaster(master)\ 
.set('spark.executor.memory','13g')\ 
.set('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version','2')\ 
.set('fs.s3a.fast.upload','true')\ 
.set('fs.s3a.fast.upload.buffer','disk')\ 
.set('fs.s3a.buffer.dir','/tmp/s3a') 

和保存這樣與S3A的實木複合地板:

df.write.parquet('s3a://mybucket/result_parquet') 

此外,我將塊大小減少到48(每個實例16個)。錯誤變少了,但仍有一些出現。但是現在由於塊大小減小,性能下降了。

現在我想知道:

一)我有沒有正確配置SparkConf()?在減少塊大小之後,這些錯誤只會顯着減少。

b)如果S3性能受到每個EC2實例「請求」的限制。因此,如果我使用3箇中等實例而不是6個較小的S3,那麼S3能夠更好地處理192個parquet寫入任務,只是因爲它們來自更多實例?

回答

1
  1. 您到S3的帶寬取決於您租用的VM類型:網絡越好,帶寬越多。
  2. 您被限制寫入特定的存儲桶或存儲分區的分區,其中S3將發送503響應以供客戶端處理。
  3. 但是你在這裏看到的是作爲一個套接字異常浮出水面;看起來像AWS xfer管理器沒有處理它。

如果在任務發生的提交,這是因爲S3A模仿重命名()與複製+刪除,輸出提交者希望重命名是一個O(1)原子操作,而不是一個緩慢的。更關鍵的是,重命名的模仿依賴於S3中的列表文件,並且由於s3最終一致,可能會錯過列表中的文件。 除非你有一個一致性層頂上S3(s3mper,s3guard)數據就會丟失

,你應該提交到本地HDFS進行查詢的任何序列,複製最多S3時,你有最後的工作。或者,如果您有時間,請幫助測試HADOOP-13786,其中添加了0重命名提交

+0

感謝您的輸入。我可能會創建一個HDFS緩存。當我將HFDS中的內容複製到S3時,有一個問題難以解決,那麼丟失數據的風險仍然存在? – Thagor

+0

否:應用程序獲取HDFS數據的一致列表,他們用它來複制 –