雖然您需要Hadoop庫,但您不必安裝Hadoop即可在本地運行並寫入S3。我碰巧試着寫出基於Avro模式的Parquet輸出並生成SpecificRecord到S3。我通過SBT和Intellij Idea在本地運行以下代碼的一個版本。需要的部分:
1)讓以下文件指定所需的Hadoop屬性(注意:不建議定義AWS訪問密鑰/密鑰。最好在具有適當IAM角色讀/寫的EC2實例上運行。您的S3存儲桶,但需要對當地進行測試)
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)進口: 進口com.uebercomputing.eventrecord.EventOnlyRecord
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroParquetOutputFormat
3)弗林克代碼使用HadoopOutputFormat以上配置:
val events: DataSet[(Void, EventOnlyRecord)] = ...
val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)
val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
val outputJob = Job.getInstance
//Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
//so key is Void, value of type T - EventOnlyRecord in this case
val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
outputFormat,
outputJob
)
val outputConfig = outputJob.getConfiguration
outputConfig.addResource(hadoopConfig)
val outputPath = new Path("s3://<bucket>/<dir-prefix>")
FileOutputFormat.setOutputPath(outputJob, outputPath)
AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)
events.output(hadoopOutputFormat)
env.execute
...
def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
val hadoopConfig = new HadoopConfiguration()
hadoopConfig.addResource(new Path(hadoodConfigPath))
hadoopConfig
}
4)結構的依賴關係和版本使用:
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"
val flinkVersion = "1.1.4"
val flinkDependencies = Seq(
("org.apache.flink" %% "flink-scala" % flinkVersion),
("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
)
val providedFlinkDependencies = flinkDependencies.map(_ % "provided")
val serializationDependencies = Seq(
("org.apache.avro" % "avro" % "1.7.7"),
("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
("org.apache.parquet" % "parquet-avro" % "1.8.1")
)
val s3Dependencies = Seq(
("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)
編輯使用writeAsText至S3:
1)創建一個Hadoop配置目錄(將引用此作爲Hadoop的CONF-DIR ),其中包含一個文件core-site.xml。
例如:
mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml
#content of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)在它一個文件弗林克-conf.yaml創建一個目錄(將引用此作爲弗林克-conf的-DIR)。
例如:
mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml
//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config
3)編輯用於運行S3弗林克作業您的IntelliJ Run配置 - 運行 - 編輯組態 - 並添加以下環境變量:
FLINK_CONF_DIR and set it to your flink-conf-dir
Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config
4)使用該環境變量集運行代碼:
events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute
檢查端口6123是否正在使用中。如果沒有,那麼禁用你的防火牆/ iptables。 –