2017-02-16 118 views
1

所以這個問題一直在推動我瘋了,它開始覺得像s3的火花不是這個具體工作的正確工具。基本上,我在s3存儲桶中擁有數百萬個較小的文件。由於我無法理解的原因,這些文件無法合併(一個是唯一的加密轉錄本)。我已經看到類似的問題,每一個解決方案都沒有產生好的結果。我想的第一件事是通配符:如何處理數百萬個較小的s3文件與apache的火花

sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count(); 

注:計數是它需要多長時間來處理文件的詳細調試。這項工作幾乎花了整整一天的時間超過10個實例,但仍然失敗,發佈在列表底部的錯誤。然後我發現這個鏈接,它基本上說這不是最優的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

然後,我決定嘗試另一種解決方案,目前我找不到,這表示加載所有路徑,然後聯合所有在RDDS

ObjectListing objectListing = s3Client.listObjects(bucket); 
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>(); 
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>(); 
    //initializes objectListing 
    tempMeta.addAll(objectListing.getObjectSummaries().stream() 
      .map(func) 
      .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript")) 
      .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName())) 
      .collect(Collectors.toList())); 

    while(objectListing.isTruncated()) { 
     objectListing = s3Client.listNextBatchOfObjects(objectListing); 
     tempMeta.addAll(objectListing.getObjectSummaries().stream() 
       .map(func) 
       .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript")) 
       .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName())) 
       .collect(Collectors.toList())); 
     if (tempMeta.size() > 5000) { 
      rdds.addAll(tempMeta); 
      tempMeta = new ArrayList<>(); 
     } 
    } 

    if (!tempMeta.isEmpty()){ 
     rdds.addAll(tempMeta); 
    } 
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size())); 

然後,即使當我設置設置emrfs現場配置到:

{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.consistent.retryPolicyType": "fixed", 
     "fs.s3.consistent.retryPeriodSeconds": "15", 
     "fs.s3.consistent.retryCount": "20", 
     "fs.s3.enableServerSideEncryption": "true", 
     "fs.s3.consistent": "false" 
    } 
} 

我6個小時每次我試圖運行的工作時間內得到這個錯誤

17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond 
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond 

所以首先,有沒有辦法使用s3的火花使用較小的文件?我不在乎解決方案是否不理想,我只是想嘗試一些工作。我想過嘗試使用Spark流,因爲它的內部結構與加載所有文件有點不同。然後我會使用fileStream並將newFiles設置爲false。然後我可以批量處理它們。但是,這不是火花流傳輸的原因,所以我在走這條路線時遇到了衝突。作爲一個便箋,我將數百萬個小文件生成爲hdfs,並嘗試了相同的工作,並在一個小時內完成。這讓我覺得它是特定的s3。另外,我正在使用s3a,而不是普通的s3。

回答

4

如果您使用的是亞馬遜EMR,那麼您需要使用s3://網址; s3a://是爲ASF發佈的。

一個大問題是在s3中列出目錄樹需要多長時間,特別是遞歸樹遍歷。 spark代碼假定它是一個快速文件系統,其中列出目錄和說明文件的成本很低,而實際上每個操作都需要1-4個HTTPS請求,即使在重用的HTTP/1.1連接上也會受到傷害。它可以如此緩慢,你can see the pauses in the log

這真的很讓人傷心,因爲它是大量延遲發生的前端分區,所以這是一系列的工作,它正在受到譴責。

雖然有一些加速在S3a的樹遍歷在Hadoop中2.8來作爲the S3a phase II work部分的/ /*.txt形式通配符掃描是不會得到任何的加速。我的建議是嘗試將您的目錄結構扁平化,以便您可以從深度樹移到淺層,甚至可以放在同一個目錄中,這樣就可以不經過漫遊就掃描它,代價爲每5000個條目有1個HTTP請求。

請記住,無論如何,很多小文件都非常昂貴,包括在HDFS中使用存儲的地方。有一個特殊的彙總格式,HAR文件,除了hadoop,hive和spark都可以在文件內部工作外,它們就像tar文件一樣。那可能幫助,雖然我沒有看到任何實際的性能測試數字。

+0

很好的答案,謝謝史蒂夫。我今天會試驗它。 –

+0

沒問題。出於好奇 - 並幫助塑造未來的性能測試 - 目錄分區是什麼?像YYYY/MM/DD,如2017/01/23 /? –