以下是方面一些要點我怎麼會有事的設置:AWS Glue to Redshift:是否可以替換,更新或刪除數據?
- 我已經上傳到S3的CSV文件和膠履帶設置來創建表和模式。
- 我有一個膠水作業設置,它使用JDBC連接將膠水錶中的數據寫入Amazon Redshift數據庫。該工作還負責映射列和創建紅移表。
通過重新運行一項工作,我得到了redshift中的重複行(如預期的那樣)。但是,在插入新數據之前,是否有辦法替換或刪除行,使用密鑰或膠水中的分區設置?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
columnMapping = [
("id", "int", "id", "int"),
("name", "string", "name", "string"),
]
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
job.commit()
好問題,現在遇到同樣的問題。你到目前爲止取得了哪些進展? – Matthijs
我與AWS Glue支持部門聯繫並能夠解決問題。它沒有出現膠水有辦法做到這一點,或從來沒有爲這種類型的工作。我能夠得到一個工作解決方案的方式是讓膠水將所有行插入暫存表中,然後在膠水外執行上/合併。 – krchun