2016-12-06 77 views
0

我在Google Dataproc上運行Spark羣集,並且在嘗試使用sparkContext.textFile(...)從FTP讀取GZipped文件時遇到一些問題。無法通過FTP使用Google Dataproc上的SparkContext.textFile(...)讀取文件

我運行的代碼是:

object SparkFtpTest extends App { 
    val file = "ftp://username:[email protected]:21/filename.txt.gz" 
    val lines = sc.textFile(file) 
    lines.saveAsTextFile("gs://my-bucket-storage/tmp123") 
} 

,我得到的錯誤是:

Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication. 

我看到一些人建議的憑據是錯誤的,所以我試着進入錯誤的憑據和錯誤是不同的,即無效的登錄憑據。

如果我將該URL複製到瀏覽器中 - 該文件正在被正確下載,它也可以使用。

另外值得一提的是,我已經嘗試過使用Apache commons-net庫(與Spark-2.2中的版本相同)並且工作正常 - 我能夠將數據(來自Master和Worker節點) 。我無法解壓縮它(通過使用Java的GZipInputStream;我不記得失敗,但如果你認爲這很重要,我可以嘗試重現它)。我認爲這表明這不是羣集上的某個防火牆問題,儘管我無法使用curl下載該文件。

我想我幾個月前從本地機器運行相同的代碼,如果我沒記錯的話,它工作得很好。

你有什麼想法是什麼導致這個問題? 難道這是某種依賴衝突問題,如果是這樣的話?

我在項目中有幾個依賴關係,例如google-sdk,solrj ......但是,如果是依賴關係問題,我希望看到類似ClassNotFoundExceptionNoSuchMethodError的東西。

整個堆棧跟蹤看起來是這樣的:

16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/ 
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache 
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/ 
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache 
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/ 
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache 
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication. 
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586) 
    at org.apache.commons.net.ftp.FTP.quit(FTP.java:794) 
    at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788) 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151) 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395) 
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701) 
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438) 
+0

你正在使用什麼Dataproc圖像版本? –

+0

這是我用來創建集羣的腳本,所以我期望成爲最新的(1.1): 'gcl​​oud數據集羣創建$ cluster_name --zone $ zone --num-workers $ num_workers --initialization-actions $ init_actions - master-machine-type $ master_machine_type --worker-machine-type $ worker_machine_type --scopes datastore' –

回答

1

它看起來像這可能是星火/ Hadoop的一個已知的未解決的問題:https://issues.apache.org/jira/browse/HADOOP-11886https://github.com/databricks/learning-spark/issues/21都暗示了類似的堆棧跟蹤。

如果你能夠手動使用Apache Commons網庫,你可以通過獲取文件的列表,並行文件,這些文件列表作爲RDD,並使用flatMap每個任務需要達到與sc.textFile同樣的效果一個文件名並逐行讀取文件,爲每個文件生成輸出行集合。另外,如果FTP中的數據量很小(最多可能爲10 GB),那麼與單個線程從FTP服務器複製到HDFS或GCS相比,並行讀取不會有太大幫助在您的Dataproc集羣中,然後在Spark作業中使用HDFS或GCS路徑進行處理。

+0

謝謝Dennis。這真的讓我感到困擾,但現在有意義。我通過獲取文件列表來實施手動解決方案。但是,由於所有文件都很小,因此我喜歡您提供更多的替代解決方案。 –