2017-07-19 87 views
1

我想連接到從R sparklyr S3存儲桶。 我可以將本地文件讀取到spark上下文中。 然而,嘗試連接s3似乎是問題, 拋出了一大堆錯誤。 以下是使用的代碼列表。Sparklyr連接到S3存儲桶拋出錯誤

注意:單個s3存儲桶有多個csv文件,其中 遵循相同的模式。

library(sparklyr) 
library(tidyverse) 

sparklyr :: spark_install (version = "2.0.2" , hadoop_version = "2.7") 
sparklyr::spark_install(version = "2.0.2" , hadoop_version = "2.7") 
Sys.setenv (AWS_ACCESS_KEY_ID = "xxxx") 
Sys.setenv (AWS_SECRET_ACCESS_KEY = "xxxx") 
Sys.setenv (AWS_DEFAULT_REGION = "ap-southeast-1") 

Spark_config <- sparklyr :: spark_config() 
sc <- sparklyr :: spark_connect (master = "local" ,config = Spark_config) 
files = "s3n://temp-sg/MVC" 
temp<-spark_read_csv(sc,name = "MVC",path=files,infer_schema = TRUE) 
spark_disconnect(sc) 

這裏的任何幫助,非常感謝。

以下是錯誤轉儲使用S3A的://

Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a: 
    at org.apache.hadoop.fs.Path.initialize(Path.java:206) 
    at org.apache.hadoop.fs.Path.<init>(Path.java:172) 
    at org.apache.hadoop.fs.Path.<init>(Path.java:94) 
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211) 
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1302) 
    at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249) 
    at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245) 
    at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223) 
    at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72) 
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157) 
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at sparklyr.Invoke$.invoke(invoke.scala:94) 
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89) 
    at sparklyr.StreamHandler$.read(stream.scala:55) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:49) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a: 
    at java.net.URI$Parser.fail(Unknown Source) 
    at java.net.URI$Parser.failExpecting(Unknown Source) 
    at java.net.URI$Parser.parse(Unknown Source) 
    at java.net.URI.<init>(Unknown Source) 
    at org.apache.hadoop.fs.Path.initialize(Path.java:203) 
    ... 58 more 

錯誤與使用S3N的轉儲://

Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n: 
     at org.apache.hadoop.fs.Path.initialize(Path.java:206) 
     at org.apache.hadoop.fs.Path.<init>(Path.java:172) 
     at org.apache.hadoop.fs.Path.<init>(Path.java:94) 
     at org.apache.hadoop.fs.Globber.glob(Globber.java:211) 
     at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) 
     at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257) 
     at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
     at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.take(RDD.scala:1302) 
     at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249) 
     at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245) 
     at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223) 
     at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72) 
     at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157) 
     at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44) 
     at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158) 
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) 
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
     at sparklyr.Invoke$.invoke(invoke.scala:94) 
     at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89) 
     at sparklyr.StreamHandler$.read(stream.scala:55) 
     at sparklyr.BackendHandler.channelRead0(handler.scala:49) 
     at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
     at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
     at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) 
     at java.lang.Thread.run(Unknown Source) 
    Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n: 
     at java.net.URI$Parser.fail(Unknown Source) 
     at java.net.URI$Parser.failExpecting(Unknown Source) 
     at java.net.URI$Parser.parse(Unknown Source) 
     at java.net.URI.<init>(Unknown Source) 
     at org.apache.hadoop.fs.Path.initialize(Path.java:203) 


... 58 more 
+0

你可以發佈您的錯誤信息嗎?至少在事情出錯後的前幾行中。 – JanLauGe

+0

更新錯誤日誌分別爲s3a和s3n: –

回答

0

修復了問題。 這裏是代碼片段。 注意:需要驗證正確的JVM正在運行。我在64位機器上使用了32位jvm,因爲64位無法工作。 - 火花版本 - 2.0 - 的Hadoop版本 - 2.7

# install.packages("devtools") 
# devtools::install_github("rstudio/sparklyr") 

library(sparklyr) 
library(dplyr) 

# conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" 
# config$spark.executor.memory <- "4g" 
sc <- spark_connect(master = "local",config = conf) 

#Get spark context 
ctx <- sparklyr::spark_context(sc) 

#Use below to set the java spark context 
jsc <- invoke_static( 
    sc, 
    "org.apache.spark.api.java.JavaSparkContext", 
    "fromSparkContext", 
    ctx 
) 
#set the s3 configs: 
hconf <- jsc %>% invoke("hadoopConfiguration") 
hconf %>% invoke("set","fs.s3a.access.key", "xxxx") 
hconf %>% invoke("set","fs.s3a.secret.key", "xxxx") 

# check if spar session is active 
sparklyr::spark_connection_is_open(sc=sc) 


small_file = "s3a://temp-sg/MVC" 

temp<-spark_read_csv(sc,name = "MVC",path=small_file,infer_schema = TRUE) 
spark_disconnect(sc) 
+0

感謝您的更新! 理想情況下,請點擊左側的刻度線來接受您自己的答案。 – JanLauGe

0

很難說究竟是怎麼了?沒看到你確切的錯誤信息。但是,我注意到的一件事是,您使用s3n而不是s3a。這是爲什麼?我建議您嘗試s3a代替:

files <- 's3a://temp-sg/MVC' 
temp <- spark_read_csv(sc, 
    name = 'MVC', 
    path = files, 
    infer_schema = TRUE) 

this post見在兩者之間的區別更多的細節。

+0

已嘗試這些選項以及,但似乎並沒有工作 –