1

我想使用Scala在Spark Streaming中將RDD[String]寫入Amazon S3。這些基本上是JSON字符串。不知道如何更有效地做到這一點。 我找到了this post,其中使用了庫spark-s3。這個想法是創建SparkContext然後SQLContext。在此之後,帖子的作者確實是這樣的:如何將流數據寫入S3?

myDstream.foreachRDD { rdd => 
     rdd.toDF().write 
       .format("com.knoldus.spark.s3") 
       .option("accessKey","s3_access_key") 
       .option("secretKey","s3_secret_key") 
       .option("bucket","bucket_name") 
       .option("fileType","json") 
       .save("sample.json") 
} 

什麼是除了spark-s3其他的選擇嗎?是否可以將S3上的文件追加到流數據中?

回答

1

你應該在Spark Documentation看看成dataframewriter模式的方法:

public DataFrameWriter mode(SaveMode saveMode)

指定當數據或表已經存在的行爲。選項 包括: - SaveMode.Overwrite:覆蓋現有數據。 - SaveMode.Append:追加數據。 - SaveMode.Ignore:忽略 操作(即no-op)。 - SaveMode.ErrorIfExists:默認選項, 在運行時拋出異常。

你可以試試像這樣的事情追加 savemode。

rdd.toDF.write 
     .format("json") 
     .mode(SaveMode.Append) 
     .saveAsTextFile("s3://iiiii/ttttt.json"); 

Spark Append:

追加模式意味着節約了數據幀到數據源的情況下,如果數據 /表已經存在,則數據幀的內容被預期 被附加到現有的數據。

基本上可以通過使「格式」關鍵字方法

public DataFrameWriter format(java.lang.String source)

指定基礎輸出數據源選擇要作爲輸出格式的格式。內置選項包括「parquet」,「json」等。

如爲parquet

df.write().format("parquet").save("yourfile.parquet")

json

df.write().format("json").save("yourfile.json")


編輯:約S3憑據添加細節:

有兩種不同的選擇如何設置憑證,我們可以在SparkHadoopUtil.scala 與環境變量System.getenv("AWS_ACCESS_KEY_ID")spark.hadoop.foo屬性看到:

SparkHadoopUtil.scala: 
if (key.startsWith("spark.hadoop.")) { 
      hadoopConf.set(key.substring("spark.hadoop.".length), value) 
} 

所以,你需要得到hadoopConfigurationjavaSparkContext.hadoopConfiguration()scalaSparkContext.hadoopConfiguration並設置

hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey) 
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey) 
+0

我是否正確理解清潔選項是使用Spark的'saveAsTextFile',而不是使用'spark-s3'? – Lobsterrrr

+0

在您的第一個示例中,我應該在哪裏放置Amazon訪問密鑰並通過? – Lobsterrrr

+0

1,@Lobsterrrr我認爲'saveAsTextFile'是由spark api提供的。 2,並且@jbird注意到它實際上不是逐字追加的。添加大數據對我來說沒有什麼意義 - 更好的方法是創建分區,例如 – VladoDemcak

2

S3上的文件cannot be appended。 S3中的「追加」意味着用包含附加數據的新對象替換現有對象。

+0

這是一個好點:讓我們假設文件已經存在於S3中。 Spark表示數據將被追加。這是否意味着spark會運行幾個進程:類似於create_new_data_file - > get_existing_file - > merge_files - > replace_file_on_S3?它對我來說看起來不像原子操作。如果smth出問題了 - 會發生什麼? –