我在Google Dataproc上運行Spark羣集,並且在嘗試使用sparkContext.textFile(...)
從FTP讀取GZipped文件時遇到一些問題。無法通過FTP使用Google Dataproc上的SparkContext.textFile(...)讀取文件
我運行的代碼是:
object SparkFtpTest extends App {
val file = "ftp://username:[email protected]:21/filename.txt.gz"
val lines = sc.textFile(file)
lines.saveAsTextFile("gs://my-bucket-storage/tmp123")
}
,我得到的錯誤是:
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
我看到一些人建議的憑據是錯誤的,所以我試着進入錯誤的憑據和錯誤是不同的,即無效的登錄憑據。
如果我將該URL複製到瀏覽器中 - 該文件正在被正確下載,它也可以使用。
另外值得一提的是,我已經嘗試過使用Apache commons-net庫(與Spark-2.2中的版本相同)並且工作正常 - 我能夠將數據(來自Master和Worker節點) 。我無法解壓縮它(通過使用Java的GZipInputStream;我不記得失敗,但如果你認爲這很重要,我可以嘗試重現它)。我認爲這表明這不是羣集上的某個防火牆問題,儘管我無法使用curl
下載該文件。
我想我幾個月前從本地機器運行相同的代碼,如果我沒記錯的話,它工作得很好。
你有什麼想法是什麼導致這個問題? 難道這是某種依賴衝突問題,如果是這樣的話?
我在項目中有幾個依賴關係,例如google-sdk,solrj ......但是,如果是依賴關係問題,我希望看到類似ClassNotFoundException
或NoSuchMethodError
的東西。
整個堆棧跟蹤看起來是這樣的:
16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586)
at org.apache.commons.net.ftp.FTP.quit(FTP.java:794)
at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788)
at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)
你正在使用什麼Dataproc圖像版本? –
這是我用來創建集羣的腳本,所以我期望成爲最新的(1.1): 'gcloud數據集羣創建$ cluster_name --zone $ zone --num-workers $ num_workers --initialization-actions $ init_actions - master-machine-type $ master_machine_type --worker-machine-type $ worker_machine_type --scopes datastore' –