2016-12-29 143 views
2

我對Apache Flink相對來說比較新,我正在嘗試創建一個生成AWS S3存儲桶文件的簡單項目。基於文檔,它看起來像我需要安裝Hadoop才能執行此操作。Apache Flink AWS S3 Sink是否需要Hadoop進行本地測試?

如何設置我的本地環境以允許測試此功能?我已經在本地安裝了Apache Flink和Hadoop。我已經爲Hadoop的core-site.xml配置添加了必要的更改,並且還將我的HADOOP_CONF路徑添加到了我的flink.yaml配置中。當我嘗試通過弗林克UI本地提交我的工作,我總是得到一個錯誤

2016-12-29 16:03:49,861 INFO org.apache.flink.util.NetUtils        - Unable to allocate on port 6123, due to error: Address already in use 
 
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager    - Failed to run JobManager. 
 
java.lang.RuntimeException: Unable to do further retries starting the actor system 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040) 
 
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我猜想,我很想念我的環境是如何設置的東西。有沒有可能在本地做到這一點?任何幫助,將不勝感激。

+0

檢查端口6123是否正在使用中。如果沒有,那麼禁用你的防火牆/ iptables。 –

回答

3

雖然您需要Hadoop庫,但您不必安裝Hadoop即可在本地運行並寫入S3。我碰巧試着寫出基於Avro模式的Parquet輸出並生成SpecificRecord到S3。我通過SBT和Intellij Idea在本地運行以下代碼的一個版本。需要的部分:

1)讓以下文件指定所需的Hadoop屬性(注意:不建議定義AWS訪問密鑰/密鑰。最好在具有適當IAM角色讀/寫的EC2實例上運行。您的S3存儲桶,但需要對當地進行測試)

<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)進口: 進口com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat 
import org.apache.flink.api.scala.{ExecutionEnvironment, _} 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.mapreduce.Job 

import org.apache.parquet.avro.AvroParquetOutputFormat 

3)弗林克代碼使用HadoopOutputFormat以上配置:

val events: DataSet[(Void, EventOnlyRecord)] = ... 

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile) 

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord] 
    val outputJob = Job.getInstance 

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T] 
    //so key is Void, value of type T - EventOnlyRecord in this case 
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
     outputFormat, 
     outputJob 
    ) 

    val outputConfig = outputJob.getConfiguration 
    outputConfig.addResource(hadoopConfig) 
    val outputPath = new Path("s3://<bucket>/<dir-prefix>") 
    FileOutputFormat.setOutputPath(outputJob, outputPath) 
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema) 

    events.output(hadoopOutputFormat) 

    env.execute 

    ... 

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = { 
     val hadoopConfig = new HadoopConfiguration() 
     hadoopConfig.addResource(new Path(hadoodConfigPath)) 
     hadoopConfig 
    } 

4)結構的依賴關係和版本使用:

val awsSdkVersion = "1.7.4" 
    val hadoopVersion = "2.7.3" 
    val flinkVersion = "1.1.4" 

    val flinkDependencies = Seq(
     ("org.apache.flink" %% "flink-scala" % flinkVersion), 
     ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) 
    ) 

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided") 

    val serializationDependencies = Seq(
     ("org.apache.avro" % "avro" % "1.7.7"), 
     ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), 
     ("org.apache.parquet" % "parquet-avro" % "1.8.1") 
    ) 

    val s3Dependencies = Seq(
     ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), 
     ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) 
    ) 

編輯使用writeAsText至S3:

1)創建一個Hadoop配置目錄(將引用此作爲Hadoop的CONF-DIR ),其中包含一個文件core-site.xml。

例如:

mkdir /home/<user>/hadoop-config 
cd /home/<user>/hadoop-config 
vi core-site.xml 

#content of core-site.xml 
<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)在它一個文件弗林克-conf.yaml創建一個目錄(將引用此作爲弗林克-conf的-DIR)。

例如:

mkdir /home/<user>/flink-config 
cd /home/<user>/flink-config 
vi flink-conf.yaml 

//content of flink-conf.yaml - continuing earlier example 
fs.hdfs.hadoopconf: /home/<user>/hadoop-config 

3)編輯用於運行S3弗林克作業您的IntelliJ Run配置 - 運行 - 編輯組態 - 並添加以下環境變量:

FLINK_CONF_DIR and set it to your flink-conf-dir 

Continuing the example above: 
FLINK_CONF_DIR=/home/<user>/flink-config 

4)使用該環境變量集運行代碼:

events.writeAsText("s3://<bucket>/<prefix-dir>") 

env.execute 
+0

感謝您的回覆。有沒有一種方法可以將我的本地java執行指向hadoop配置文件,而無需定義outputPath。基於文檔,似乎我應該能夠做一些類似於:messageStream.writeAsText(「s3:// ...」);但是當我通過IntelliJ運行本地執行時,它不知道該文件在哪裏。我也似乎無法找到任何flink操作,這將允許我在運行時設置它。 – medium

+0

問題是,在調用writeAsText時使用的默認HadoopFileSystem不會「瞭解」s3文件系統。請參閱上面編輯我的原始答案。 – medale

+0

所以我認爲我能夠正常工作,但是我正在訪問S3存儲桶。出現此錯誤: com.amazonaws.services.s3.model.AmazonS3Exception:狀態碼:403,AWS服務:Amazon S3,AWS請求標識:**********,AWS錯誤代碼:null,AWS錯誤消息:Forbidden,S3擴展請求ID: 我不確定爲什麼它存在訪問錯誤,因爲我的應用中使用的密鑰與創建S3存儲桶的帳戶相同。似乎現在一切都在工作。如果您有任何提示,說明我爲什麼會收到此錯誤,請告訴我。再次感謝! – medium