2017-01-09 613 views
1

我有一個小型測試項目來將數據推送到S3存儲桶。但是,它看起來像我沒有讀取core-site.xml文件,因爲我收到錯誤java.io.IOException: No file system found with scheme s3a。我如何正確讀取core-site.xml文件並將數據推送到S3?用Apache Flink將數據推送到S3

這是代碼:

public class S3Sink { 
public static void main(String[] args) throws Exception { 
    Map<String, String> configs = ConfigUtils.loadConfigs(「path/to/config.yaml"); 

    final ParameterTool parameterTool = ParameterTool.fromMap(configs); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.getConfig().disableSysoutLogging(); 
    env.getConfig().setGlobalJobParameters(parameterTool); 

    DataStream<String> messageStream = env 
      .addSource(new FlinkKafkaConsumer09<String>(
        parameterTool.getRequired("kafka.topic"), 
        new SimpleStringSchema(), 
        parameterTool.getProperties())); 

    String id = UUID.randomUUID().toString(); 
    messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1); 

    env.execute(); 
} 

這是弗林克-conf.yaml文件中的配置變化來引用核心site.xml文件:

fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/ 

這是我的核心-site.xml:

<configuration> 
<property> 
    <name>fs.defaultFS</name> 
    <value>hdfs://localhost:9000</value> 
</property> 
<property> 
    <name>fs.s3.impl</name> 
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
</property> 

<!-- Comma separated list of local directories used to buffer 
    large results prior to transmitting them to S3. --> 
<property> 
    <name>fs.s3a.buffer.dir</name> 
    <value>/tmp</value> 
</property> 

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
<property> 
    <name>fs.s3a.awsAccessKeyId</name> 
    <value>*****</value> 
</property> 
<!-- set your AWS access key --> 
<property> 
    <name>fs.s3a.awsSecretAccessKey</name> 
    <value>*****</value> 
</property> 

+0

如果將'fs.hdfs.hadoopconf'設置爲連續'core-site.xml'文件夾,那麼它會起作用嗎?還要確保'$ HADOOP_HOME'環境變量設置正確。 –

+0

我正在使用IntelliJ並將環境變量HADOOP_HOME設置爲core-site.xml路徑。我在本地運行程序,因此fs.hdfs.hadoopconf設置不起作用。 – Sam

回答

1

core-site.xml文件未被讀入的原因是由於Hadoop的文件結構。我有HADOOP_HOME=path/to/dir/etc/hadoop。但是,Hadoop會將etc/hadoop作爲其文件結構的一部分來查找core-site.xml。要在HADOOP_HOME環境變量中正確讀取路徑,應該將其列爲HADOOP_HOME=path/to/dir

另一個問題是數據沒有推到S3的原因。這是因爲我正在使用流處理。批量處理的作用是將數據推送到S3,但流處理並不是因爲S3如何將數據存儲爲鍵/值存儲,並且新數據只能被替換。對於流處理,Flink會將數據附加到S3不允許的數據上,因此不會將數據推送到S3。所以這段代碼適用於將批量推送到S3

ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet dataSet = ee.readTextFile("/Users/name/Desktop/flinkoutputtest.txt"); 
    dataSet.writeAsText("s3://flink-test/flink-output/testdoc.txt").setParallelism(1); 
    ee.execute();